Repository: flink
Updated Branches:
  refs/heads/master ece899a9f -> cb2820675


[FLINK-5461] [gelly] Remove Superflous TypeInformation Declaration

FLINK-4624 updated Gelly's Summarization algorithm to use
Either<NullValue, VV> in order to support types for which the
serialization does not support null values. This required the use of
explicit TypeInformation due to TypeExtractor. FLINK-4673 created a
TypeInfoFactory for EitherType so the explicit TypeInformation can be
removed.

This closes #3096


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b408d61f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b408d61f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b408d61f

Branch: refs/heads/master
Commit: b408d61f707806d2a188c55065d1187f4f05099a
Parents: ece899a
Author: Greg Hogan <[email protected]>
Authored: Wed Jan 11 13:43:58 2017 -0500
Committer: Greg Hogan <[email protected]>
Committed: Tue Jan 17 15:37:53 2017 -0500

----------------------------------------------------------------------
 .../flink/graph/library/Summarization.java      | 26 +++-----------------
 1 file changed, 4 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b408d61f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
index 2361247..fed4d89 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
@@ -23,15 +23,10 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -97,31 +92,18 @@ public class Summarization<K, VV, EV>
 
        @Override
        public Graph<K, VertexValue<VV>, EdgeValue<EV>> run(Graph<K, VV, EV> 
input) throws Exception {
-               // create type infos for correct return type definitions
-               // Note: this use of type hints is only required due to
-               // limitations of the type parser for the Either type
-               // which are being fixed in FLINK-4673
-               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
input.getVertices().getType()).getTypeAt(0);
-               TypeInformation<VV> valueType = ((TupleTypeInfo<?>) 
input.getVertices().getType()).getTypeAt(1);
-               @SuppressWarnings("unchecked")
-               TupleTypeInfo<Vertex<K, VertexValue<VV>>> vertexType = 
(TupleTypeInfo<Vertex<K, VertexValue<VV>>>) new TupleTypeInfo(
-                       Vertex.class, keyType, new 
TupleTypeInfo(VertexValue.class, valueType, BasicTypeInfo.LONG_TYPE_INFO));
-
                // -------------------------
                // build super vertices
                // -------------------------
 
-               // group vertices by value
-               UnsortedGrouping<Vertex<K, VV>> vertexUnsortedGrouping = 
input.getVertices()
-                               .groupBy(1);
-               // reduce vertex group and create vertex group items
-               GroupReduceOperator<Vertex<K, VV>, VertexGroupItem<K, VV>> 
vertexGroupItems = vertexUnsortedGrouping
+               // group vertices by value and create vertex group items
+               DataSet<VertexGroupItem<K, VV>> vertexGroupItems = 
input.getVertices()
+                               .groupBy(1)
                                .reduceGroup(new VertexGroupReducer<K, VV>());
                // create super vertices
                DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = 
vertexGroupItems
                                .filter(new 
VertexGroupItemToSummarizedVertexFilter<K, VV>())
-                               .map(new 
VertexGroupItemToSummarizedVertexMapper<K, VV>())
-                               .returns(vertexType);
+                               .map(new 
VertexGroupItemToSummarizedVertexMapper<K, VV>());
 
                // -------------------------
                // build super edges

Reply via email to