Repository: flink Updated Branches: refs/heads/master ece899a9f -> cb2820675
[FLINK-5461] [gelly] Remove Superflous TypeInformation Declaration FLINK-4624 updated Gelly's Summarization algorithm to use Either<NullValue, VV> in order to support types for which the serialization does not support null values. This required the use of explicit TypeInformation due to TypeExtractor. FLINK-4673 created a TypeInfoFactory for EitherType so the explicit TypeInformation can be removed. This closes #3096 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b408d61f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b408d61f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b408d61f Branch: refs/heads/master Commit: b408d61f707806d2a188c55065d1187f4f05099a Parents: ece899a Author: Greg Hogan <[email protected]> Authored: Wed Jan 11 13:43:58 2017 -0500 Committer: Greg Hogan <[email protected]> Committed: Tue Jan 17 15:37:53 2017 -0500 ---------------------------------------------------------------------- .../flink/graph/library/Summarization.java | 26 +++----------------- 1 file changed, 4 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b408d61f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java index 2361247..fed4d89 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java @@ -23,15 +23,10 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; -import org.apache.flink.api.java.operators.GroupReduceOperator; -import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; @@ -97,31 +92,18 @@ public class Summarization<K, VV, EV> @Override public Graph<K, VertexValue<VV>, EdgeValue<EV>> run(Graph<K, VV, EV> input) throws Exception { - // create type infos for correct return type definitions - // Note: this use of type hints is only required due to - // limitations of the type parser for the Either type - // which are being fixed in FLINK-4673 - TypeInformation<K> keyType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(0); - TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1); - @SuppressWarnings("unchecked") - TupleTypeInfo<Vertex<K, VertexValue<VV>>> vertexType = (TupleTypeInfo<Vertex<K, VertexValue<VV>>>) new TupleTypeInfo( - Vertex.class, keyType, new TupleTypeInfo(VertexValue.class, valueType, BasicTypeInfo.LONG_TYPE_INFO)); - // ------------------------- // build super vertices // ------------------------- - // group vertices by value - UnsortedGrouping<Vertex<K, VV>> vertexUnsortedGrouping = input.getVertices() - .groupBy(1); - // reduce vertex group and create vertex group items - GroupReduceOperator<Vertex<K, VV>, VertexGroupItem<K, VV>> vertexGroupItems = vertexUnsortedGrouping + // group vertices by value and create vertex group items + DataSet<VertexGroupItem<K, VV>> vertexGroupItems = input.getVertices() + .groupBy(1) .reduceGroup(new VertexGroupReducer<K, VV>()); // create super vertices DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = vertexGroupItems .filter(new VertexGroupItemToSummarizedVertexFilter<K, VV>()) - .map(new VertexGroupItemToSummarizedVertexMapper<K, VV>()) - .returns(vertexType); + .map(new VertexGroupItemToSummarizedVertexMapper<K, VV>()); // ------------------------- // build super edges
