Repository: flink
Updated Branches:
  refs/heads/master 4666e65ef -> 3d41f2b82


[FLINK-5097][gelly] Add missing input type information to TypeExtractor
in MapVertices, mapEdges, fromDataSet, and groupReduceOnEdges

This closes #2842


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88e458b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88e458b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88e458b4

Branch: refs/heads/master
Commit: 88e458b47c27d097c18674f1d5b9630349aeb129
Parents: 4666e65
Author: vasia <[email protected]>
Authored: Sat Nov 19 15:35:43 2016 +0100
Committer: vasia <[email protected]>
Committed: Fri Dec 16 10:36:49 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java |  47 +++-----
 .../test/operations/TypeExtractorTest.java      | 119 +++++++++++++++++++
 2 files changed, 133 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88e458b4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index c6843e4..0ee03c2 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -214,7 +214,7 @@ public class Graph<K, VV, EV> {
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
 
                TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
-                               MapFunction.class, 
vertexValueInitializer.getClass(), 1, null, null);
+                               MapFunction.class, 
vertexValueInitializer.getClass(), 1, keyType, null);
 
                @SuppressWarnings({ "unchecked", "rawtypes" })
                TypeInformation<Vertex<K, VV>> returnType = 
(TypeInformation<Vertex<K, VV>>) new TupleTypeInfo(
@@ -529,7 +529,7 @@ public class Graph<K, VV, EV> {
 
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
vertices.getType()).getTypeAt(0);
 
-               TypeInformation<NV> valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, 
null);
+               TypeInformation<NV> valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, 
vertices.getType(), null);
 
                TypeInformation<Vertex<K, NV>> returnType = 
(TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
                                Vertex.class, keyType, valueType);
@@ -573,7 +573,7 @@ public class Graph<K, VV, EV> {
 
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
 
-               TypeInformation<NV> valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, 
null);
+               TypeInformation<NV> valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, 
edges.getType(), null);
 
                TypeInformation<Edge<K, NV>> returnType = 
(TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
                                Edge.class, keyType, keyType, valueType);
@@ -1002,7 +1002,7 @@ public class Graph<K, VV, EV> {
                        return vertices.coGroup(edges).where(0).equalTo(0)
                                        .with(new 
ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on out-edges");
                case ALL:
-                       return vertices.coGroup(edges.flatMap(new 
EmitOneEdgePerNode<K, VV, EV>())
+                       return vertices.coGroup(edges.flatMap(new 
EmitOneEdgePerNode<K, EV>())
                                                .name("Emit edge"))
                                        .where(0).equalTo(0).with(new 
ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
                                                .name("GroupReduce on in- and 
out-edges");
@@ -1039,7 +1039,7 @@ public class Graph<K, VV, EV> {
                                                .with(new 
ApplyCoGroupFunction<>(edgesFunction))
                                                        .name("GroupReduce on 
out-edges").returns(typeInfo);
                        case ALL:
-                               return vertices.coGroup(edges.flatMap(new 
EmitOneEdgePerNode<K, VV, EV>())
+                               return vertices.coGroup(edges.flatMap(new 
EmitOneEdgePerNode<K, EV>())
                                                        .name("Emit edge"))
                                                .where(0).equalTo(0).with(new 
ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
                                                        .name("GroupReduce on 
in- and out-edges").returns(typeInfo);
@@ -1065,24 +1065,12 @@ public class Graph<K, VV, EV> {
        public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> 
edgesFunction,
                        EdgeDirection direction) throws 
IllegalArgumentException {
 
-               switch (direction) {
-               case IN:
-                       return edges.map(new ProjectVertexIdMap<K, EV>(1))
-                                       
.withForwardedFields("f1->f0").name("Vertex ID")
-                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction))
-                                               .name("GroupReduce on 
in-edges");
-               case OUT:
-                       return edges.map(new ProjectVertexIdMap<K, EV>(0))
-                                       .withForwardedFields("f0").name("Vertex 
ID")
-                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction))
-                                               .name("GroupReduce on 
out-edges");
-               case ALL:
-                       return edges.flatMap(new EmitOneEdgePerNode<K, VV, 
EV>()).name("Emit edge")
-                               .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction))
-                                       .name("GroupReduce on in- and 
out-edges");
-               default:
-                       throw new IllegalArgumentException("Illegal edge 
direction");
-               }
+               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
vertices.getType()).getTypeAt(0);
+               TypeInformation<EV> edgeValueType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(2);
+               TypeInformation<T> returnType = 
TypeExtractor.createTypeInfo(EdgesFunction.class, edgesFunction.getClass(), 2,
+                       keyType, edgeValueType);
+
+               return groupReduceOnEdges(edgesFunction, direction, returnType);
        }
 
        /**
@@ -1115,7 +1103,7 @@ public class Graph<K, VV, EV> {
                                                .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction))
                                                        .name("GroupReduce on 
out-edges").returns(typeInfo);
                        case ALL:
-                               return edges.flatMap(new EmitOneEdgePerNode<K, 
VV, EV>()).name("Emit edge")
+                               return edges.flatMap(new EmitOneEdgePerNode<K, 
EV>()).name("Emit edge")
                                                .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction))
                                                        .name("GroupReduce on 
in- and out-edges").returns(typeInfo);
                        default:
@@ -1153,8 +1141,7 @@ public class Graph<K, VV, EV> {
                }
        }
 
-       private static final class ApplyGroupReduceFunction<K, EV, T> 
implements GroupReduceFunction<
-               Tuple2<K, Edge<K, EV>>, T>,     ResultTypeQueryable<T> {
+       private static final class ApplyGroupReduceFunction<K, EV, T> 
implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T> {
 
                private EdgesFunction<K, EV, T> function;
 
@@ -1165,14 +1152,9 @@ public class Graph<K, VV, EV> {
                public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, 
Collector<T> out) throws Exception {
                        function.iterateEdges(edges, out);
                }
-
-               @Override
-               public TypeInformation<T> getProducedType() {
-                       return 
TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, 
null);
-               }
        }
 
-       private static final class EmitOneEdgePerNode<K, VV, EV> implements 
FlatMapFunction<
+       private static final class EmitOneEdgePerNode<K, EV> implements 
FlatMapFunction<
                Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
 
                public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, 
Edge<K, EV>>> out) {
@@ -1219,7 +1201,6 @@ public class Graph<K, VV, EV> {
                                throw new NoSuchElementException("The edge 
src/trg id could not be found within the vertexIds");
                        }
                }
-
                @Override
                public TypeInformation<T> getProducedType() {
                        return 
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3,

http://git-wip-us.apache.org/repos/asf/flink/blob/88e458b4/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
new file mode 100644
index 0000000..484ef3d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.*;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TypeExtractorTest {
+
+       private Graph<Long, Long, Long> inputGraph;
+       private DataSet<Vertex<Long, Long>> vertices;
+       private DataSet<Edge<Long, Long>> edges;
+       private ExecutionEnvironment env;
+
+       @Before
+       public void setUp() throws Exception {
+               env = ExecutionEnvironment.getExecutionEnvironment();
+               vertices = TestGraphUtils.getLongLongVertexData(env);
+               edges = TestGraphUtils.getLongLongEdgeData(env);
+               inputGraph = Graph.fromDataSet(vertices, edges, env);
+       }
+
+       @Test
+       public void testMapVerticesType() throws Exception {
+
+               // test type extraction in mapVertices
+               DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = 
inputGraph.mapVertices(new VertexMapper<Long>()).getVertices();
+               Assert.assertTrue(new TupleTypeInfo(Vertex.class, 
BasicTypeInfo.LONG_TYPE_INFO,
+                       new TupleTypeInfo<Tuple2<Long, 
Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+                       .equals(outVertices.getType()));
+       }
+
+       @Test
+       public void testMapEdgesType() throws Exception {
+
+               // test type extraction in mapEdges
+               DataSet<Edge<Long, Tuple2<Long, Integer>>> outEdges = 
inputGraph.mapEdges(new EdgeMapper<Long>()).getEdges();
+               Assert.assertTrue(new TupleTypeInfo(Edge.class, 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+                       new TupleTypeInfo<Tuple2<Long, 
Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+                       .equals(outEdges.getType()));
+       }
+
+       @Test
+       public void testFromDataSet() throws Exception {
+               DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = 
Graph.fromDataSet(edges, new VertexInitializer<Long>(), env)
+                       .getVertices();
+               Assert.assertTrue(new TupleTypeInfo(Vertex.class, 
BasicTypeInfo.LONG_TYPE_INFO,
+                       new TupleTypeInfo<Tuple2<Long, 
Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+                       .equals(outVertices.getType()));
+       }
+
+       @Test
+       public void testGroupReduceOnEdges() throws Exception {
+               DataSet<Tuple2<Long, Long>> output = 
inputGraph.groupReduceOnEdges(new EdgesGroupFunction<Long, Long>(), 
EdgeDirection.OUT);
+               Assert.assertTrue((new TupleTypeInfo<Tuple2<Long, 
Long>>(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO)).equals(output.getType()));
+       }
+
+       public static final class VertexMapper<K> implements 
MapFunction<Vertex<K, Long>, Tuple2<K, Integer>> {
+
+               private final Tuple2<K, Integer> outTuple = new Tuple2<>();
+
+               @Override
+               public Tuple2<K, Integer> map(Vertex<K, Long> inputVertex) 
throws Exception {
+                       return outTuple;
+               }
+       }
+
+       public static final class EdgeMapper<K> implements MapFunction<Edge<K, 
Long>, Tuple2<K, Integer>> {
+
+               private final Tuple2<K, Integer> outTuple = new Tuple2<>();
+
+               @Override
+               public Tuple2<K, Integer> map(Edge<K, Long> inputEdge) throws 
Exception {
+                       return outTuple;
+               }
+       }
+
+       public static final class EdgesGroupFunction<K, EV> implements 
EdgesFunction<K, EV, Tuple2<K, EV>> {
+
+               @Override
+               public void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> 
edges, Collector<Tuple2<K, EV>> out) throws Exception {
+                       out.collect(new Tuple2<K, EV>());
+               }
+       }
+
+       public static final class VertexInitializer<K> implements 
MapFunction<K, Tuple2<K, Integer>> {
+
+               @Override
+               public Tuple2<K, Integer> map(K value) throws Exception {
+                       return null;
+               }
+       }
+}

Reply via email to