Repository: flink Updated Branches: refs/heads/master 4666e65ef -> 3d41f2b82
[FLINK-5097][gelly] Add missing input type information to TypeExtractor in MapVertices, mapEdges, fromDataSet, and groupReduceOnEdges This closes #2842 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88e458b4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88e458b4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88e458b4 Branch: refs/heads/master Commit: 88e458b47c27d097c18674f1d5b9630349aeb129 Parents: 4666e65 Author: vasia <[email protected]> Authored: Sat Nov 19 15:35:43 2016 +0100 Committer: vasia <[email protected]> Committed: Fri Dec 16 10:36:49 2016 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 47 +++----- .../test/operations/TypeExtractorTest.java | 119 +++++++++++++++++++ 2 files changed, 133 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/88e458b4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index c6843e4..0ee03c2 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -214,7 +214,7 @@ public class Graph<K, VV, EV> { TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); TypeInformation<VV> valueType = TypeExtractor.createTypeInfo( - MapFunction.class, vertexValueInitializer.getClass(), 1, null, null); + MapFunction.class, vertexValueInitializer.getClass(), 1, keyType, null); @SuppressWarnings({ "unchecked", "rawtypes" }) TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) new TupleTypeInfo( @@ -529,7 +529,7 @@ public class Graph<K, VV, EV> { TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); - TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, vertices.getType(), null); TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo( Vertex.class, keyType, valueType); @@ -573,7 +573,7 @@ public class Graph<K, VV, EV> { TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); - TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, edges.getType(), null); TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo( Edge.class, keyType, keyType, valueType); @@ -1002,7 +1002,7 @@ public class Graph<K, VV, EV> { return vertices.coGroup(edges).where(0).equalTo(0) .with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on out-edges"); case ALL: - return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()) + return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, EV>()) .name("Emit edge")) .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)) .name("GroupReduce on in- and out-edges"); @@ -1039,7 +1039,7 @@ public class Graph<K, VV, EV> { .with(new ApplyCoGroupFunction<>(edgesFunction)) .name("GroupReduce on out-edges").returns(typeInfo); case ALL: - return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()) + return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, EV>()) .name("Emit edge")) .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)) .name("GroupReduce on in- and out-edges").returns(typeInfo); @@ -1065,24 +1065,12 @@ public class Graph<K, VV, EV> { public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction, EdgeDirection direction) throws IllegalArgumentException { - switch (direction) { - case IN: - return edges.map(new ProjectVertexIdMap<K, EV>(1)) - .withForwardedFields("f1->f0").name("Vertex ID") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) - .name("GroupReduce on in-edges"); - case OUT: - return edges.map(new ProjectVertexIdMap<K, EV>(0)) - .withForwardedFields("f0").name("Vertex ID") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) - .name("GroupReduce on out-edges"); - case ALL: - return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) - .name("GroupReduce on in- and out-edges"); - default: - throw new IllegalArgumentException("Illegal edge direction"); - } + TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); + TypeInformation<EV> edgeValueType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(2); + TypeInformation<T> returnType = TypeExtractor.createTypeInfo(EdgesFunction.class, edgesFunction.getClass(), 2, + keyType, edgeValueType); + + return groupReduceOnEdges(edgesFunction, direction, returnType); } /** @@ -1115,7 +1103,7 @@ public class Graph<K, VV, EV> { .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) .name("GroupReduce on out-edges").returns(typeInfo); case ALL: - return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge") + return edges.flatMap(new EmitOneEdgePerNode<K, EV>()).name("Emit edge") .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) .name("GroupReduce on in- and out-edges").returns(typeInfo); default: @@ -1153,8 +1141,7 @@ public class Graph<K, VV, EV> { } } - private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction< - Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> { + private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T> { private EdgesFunction<K, EV, T> function; @@ -1165,14 +1152,9 @@ public class Graph<K, VV, EV> { public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception { function.iterateEdges(edges, out); } - - @Override - public TypeInformation<T> getProducedType() { - return TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null); - } } - private static final class EmitOneEdgePerNode<K, VV, EV> implements FlatMapFunction< + private static final class EmitOneEdgePerNode<K, EV> implements FlatMapFunction< Edge<K, EV>, Tuple2<K, Edge<K, EV>>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) { @@ -1219,7 +1201,6 @@ public class Graph<K, VV, EV> { throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds"); } } - @Override public TypeInformation<T> getProducedType() { return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, http://git-wip-us.apache.org/repos/asf/flink/blob/88e458b4/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java new file mode 100644 index 0000000..484ef3d --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.graph.*; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TypeExtractorTest { + + private Graph<Long, Long, Long> inputGraph; + private DataSet<Vertex<Long, Long>> vertices; + private DataSet<Edge<Long, Long>> edges; + private ExecutionEnvironment env; + + @Before + public void setUp() throws Exception { + env = ExecutionEnvironment.getExecutionEnvironment(); + vertices = TestGraphUtils.getLongLongVertexData(env); + edges = TestGraphUtils.getLongLongEdgeData(env); + inputGraph = Graph.fromDataSet(vertices, edges, env); + } + + @Test + public void testMapVerticesType() throws Exception { + + // test type extraction in mapVertices + DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = inputGraph.mapVertices(new VertexMapper<Long>()).getVertices(); + Assert.assertTrue(new TupleTypeInfo(Vertex.class, BasicTypeInfo.LONG_TYPE_INFO, + new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) + .equals(outVertices.getType())); + } + + @Test + public void testMapEdgesType() throws Exception { + + // test type extraction in mapEdges + DataSet<Edge<Long, Tuple2<Long, Integer>>> outEdges = inputGraph.mapEdges(new EdgeMapper<Long>()).getEdges(); + Assert.assertTrue(new TupleTypeInfo(Edge.class, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, + new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) + .equals(outEdges.getType())); + } + + @Test + public void testFromDataSet() throws Exception { + DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = Graph.fromDataSet(edges, new VertexInitializer<Long>(), env) + .getVertices(); + Assert.assertTrue(new TupleTypeInfo(Vertex.class, BasicTypeInfo.LONG_TYPE_INFO, + new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) + .equals(outVertices.getType())); + } + + @Test + public void testGroupReduceOnEdges() throws Exception { + DataSet<Tuple2<Long, Long>> output = inputGraph.groupReduceOnEdges(new EdgesGroupFunction<Long, Long>(), EdgeDirection.OUT); + Assert.assertTrue((new TupleTypeInfo<Tuple2<Long, Long>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)).equals(output.getType())); + } + + public static final class VertexMapper<K> implements MapFunction<Vertex<K, Long>, Tuple2<K, Integer>> { + + private final Tuple2<K, Integer> outTuple = new Tuple2<>(); + + @Override + public Tuple2<K, Integer> map(Vertex<K, Long> inputVertex) throws Exception { + return outTuple; + } + } + + public static final class EdgeMapper<K> implements MapFunction<Edge<K, Long>, Tuple2<K, Integer>> { + + private final Tuple2<K, Integer> outTuple = new Tuple2<>(); + + @Override + public Tuple2<K, Integer> map(Edge<K, Long> inputEdge) throws Exception { + return outTuple; + } + } + + public static final class EdgesGroupFunction<K, EV> implements EdgesFunction<K, EV, Tuple2<K, EV>> { + + @Override + public void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<Tuple2<K, EV>> out) throws Exception { + out.collect(new Tuple2<K, EV>()); + } + } + + public static final class VertexInitializer<K> implements MapFunction<K, Tuple2<K, Integer>> { + + @Override + public Tuple2<K, Integer> map(K value) throws Exception { + return null; + } + } +}
