http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java index 87f137f..0be97be 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java @@ -22,582 +22,498 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.utils.EdgeToTuple3Map; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - @RunWith(Parameterized.class) -public class TestJoinWithEdges extends JavaProgramTestBase { +public class TestJoinWithEdges extends MultipleProgramsTestBase { - private static int NUM_PROGRAMS = 15; + public TestJoinWithEdges(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } - private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; private String expectedResult; - public TestJoinWithEdges(Configuration config) { - super(config); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithEdgesInputDataset() throws Exception { + /* + * Test joinWithEdges with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges() + .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,70\n" + + "4,5,90\n" + + "5,1,102\n"; + } + + @Test + public void testWithLessElements() throws Exception { + /* + * Test joinWithEdges with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) + .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; } - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + @Test + public void testWithLessElementsDifferentType() throws Exception { + /* + * Test joinWithEdges with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) + .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; } - @Override - protected void testProgram() throws Exception { - expectedResult = GraphProgs.runProgram(curProgId, resultPath); + @Test + public void testWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); + @Test + public void testWithCustomType() throws Exception { + /* + * Test joinWithEdges with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,20\n" + + "2,3,30\n" + + "3,4,40\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; } - @Parameterized.Parameters - public static Collection<Object[]> getConfigurations() throws IOException { + @Test + public void testWithEdgesOnSource() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges() + .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,25\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,69\n" + + "4,5,90\n" + + "5,1,102\n"; + } - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + @Test + public void testOnSourceWithLessElements() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,25\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } + @Test + public void testOnSourceWithDifferentType() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnSourceWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,20\n" + + "1,3,20\n" + + "2,3,60\n" + + "3,4,80\n" + + "3,5,80\n" + + "4,5,120\n" + + "5,1,51\n"; + } + + @Test + public void testOnSourceWithCustom() throws Exception { + /* + * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,10\n" + + "2,3,30\n" + + "3,4,40\n" + + "3,5,40\n" + + "4,5,45\n" + + "5,1,51\n"; + } - return toParameterList(tConfigs); + @Test + public void testWithEdgesOnTarget() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges() + .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,36\n" + + "3,4,68\n" + + "3,5,70\n" + + "4,5,80\n" + + "5,1,102\n"; } - private static class GraphProgs { - - @SuppressWarnings("serial") - public static String runProgram(int progId, String resultPath) throws Exception { - - switch (progId) { - case 1: { - /* - * Test joinWithEdges with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges() - .map(new MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Long>>() { - @Override - public Tuple3<Long, Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple3<Long, Long, Long>(edge.getSource(), - edge.getTarget(), edge.getValue()); - } - }), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,70\n" + - "4,5,90\n" + - "5,1,102\n"; - } - case 2: { - /* - * Test joinWithEdges with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) - .map(new MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Long>>() { - @Override - public Tuple3<Long, Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple3<Long, Long, Long>(edge.getSource(), - edge.getTarget(), edge.getValue()); - } - }), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 3: { - /* - * Test joinWithEdges with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) - .map(new MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>>() { - @Override - public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple3<Long, Long, Boolean>(edge.getSource(), - edge.getTarget(), true); - } - }), - new MapFunction<Tuple2<Long, Boolean>, Long>() { - - @Override - public Long map(Tuple2<Long, Boolean> tuple) throws Exception { - if(tuple.f1) { - return tuple.f0 * 2; - } - else { - return tuple.f0; - } - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 4: { - /* - * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f1 * 2; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 5: { - /* - * Test joinWithEdges with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), - new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() { - public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception { - return (long) tuple.f1.getIntField(); - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,10\n" + - "1,3,20\n" + - "2,3,30\n" + - "3,4,40\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 6: { - /* - * Test joinWithEdgesOnSource with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges() - .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); - } - }), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,25\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,69\n" + - "4,5,90\n" + - "5,1,102\n"; - } - case 7: { - /* - * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) - .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); - } - }), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,25\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 8: { - /* - * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) - .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>>() { - @Override - public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Boolean>(edge.getSource(), true); - } - }), - new MapFunction<Tuple2<Long, Boolean>, Long>() { - - @Override - public Long map(Tuple2<Long, Boolean> tuple) throws Exception { - if (tuple.f1) { - return tuple.f0 * 2; - } else { - return tuple.f0; - } - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 9: { - /* - * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f1 * 2; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,20\n" + - "1,3,20\n" + - "2,3,60\n" + - "3,4,80\n" + - "3,5,80\n" + - "4,5,120\n" + - "5,1,51\n"; - } - case 10: { - /* - * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), - new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() { - public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception { - return (long) tuple.f1.getIntField(); - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,10\n" + - "1,3,10\n" + - "2,3,30\n" + - "3,4,40\n" + - "3,5,40\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 11: { - /* - * Test joinWithEdgesOnTarget with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges() - .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); - } - }), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,26\n" + - "2,3,36\n" + - "3,4,68\n" + - "3,5,70\n" + - "4,5,80\n" + - "5,1,102\n"; - } - case 12: { - /* - * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) - .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); - } - }), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,26\n" + - "2,3,36\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 13: { - /* - * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) - .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>>() { - @Override - public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Boolean>(edge.getTarget(), true); - } - }), - new MapFunction<Tuple2<Long, Boolean>, Long>() { - - @Override - public Long map(Tuple2<Long, Boolean> tuple) throws Exception { - if (tuple.f1) { - return tuple.f0 * 2; - } else { - return tuple.f0; - } - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - case 14: { - /* - * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f1 * 2; - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,20\n" + - "1,3,40\n" + - "2,3,40\n" + - "3,4,80\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,140\n"; - } - case 15: { - /* - * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), - new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() { - public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception { - return (long) tuple.f1.getIntField(); - } - }); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - return "1,2,10\n" + - "1,3,20\n" + - "2,3,20\n" + - "3,4,40\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); + @Test + public void testWithOnTargetWithLessElements() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,36\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnTargetWithDifferentType() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnTargetWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,20\n" + + "1,3,40\n" + + "2,3,40\n" + + "3,4,80\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,140\n"; + } + + @Test + public void testOnTargetWithCustom() throws Exception { + /* + * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,20\n" + + "2,3,20\n" + + "3,4,40\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @SuppressWarnings("serial") + private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> { + public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple3<Long, Long, Boolean>(edge.getSource(), + edge.getTarget(), true); + } + } + + @SuppressWarnings("serial") + private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if(tuple.f1) { + return tuple.f0 * 2; + } + else { + return tuple.f0; } } } + + @SuppressWarnings("serial") + private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1 * 2; + } + } + + @SuppressWarnings("serial") + private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { + public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + } + + @SuppressWarnings("serial") + private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Boolean>(edge.getSource(), true); + } + } + + @SuppressWarnings("serial") + private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Boolean>(edge.getTarget(), true); + } + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java index f10140b..8b0db35 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java @@ -21,227 +21,198 @@ package org.apache.flink.graph.test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.graph.utils.VertexToTuple2Map; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - @RunWith(Parameterized.class) -public class TestJoinWithVertices extends JavaProgramTestBase { +public class TestJoinWithVertices extends MultipleProgramsTestBase { - private static int NUM_PROGRAMS = 5; + public TestJoinWithVertices(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } - private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; private String expectedResult; - public TestJoinWithVertices(Configuration config) { - super(config); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testJoinWithVertexSet() throws Exception { + /* + * Test joinWithVertices with the input DataSet parameter identical + * to the vertex DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices() + .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,8\n" + + "5,10\n"; } - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + @Test + public void testWithLessElements() throws Exception { + /* + * Test joinWithVertices with the input DataSet passed as a parameter containing + * less elements than the vertex DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) + .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,4\n" + + "5,5\n"; } - @Override - protected void testProgram() throws Exception { - expectedResult = GraphProgs.runProgram(curProgId, resultPath); + @Test + public void testWithDifferentType() throws Exception { + /* + * Test joinWithVertices with the input DataSet passed as a parameter containing + * less elements than the vertex DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) + .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,4\n" + + "5,5\n"; } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); + @Test + public void testWithDifferentKeys() throws Exception { + /* + * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), + new ProjectSecondMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,10\n" + + "2,20\n" + + "3,30\n" + + "4,40\n" + + "5,5\n"; } - @Parameterized.Parameters - public static Collection<Object[]> getConfigurations() throws IOException { + @Test + public void testWithCustomType() throws Exception { + /* + * Test joinWithVertices with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } + Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), + new CustomValueMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,10\n" + + "2,20\n" + + "3,30\n" + + "4,40\n" + + "5,5\n"; + } - return toParameterList(tConfigs); + @SuppressWarnings("serial") + private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception { + return new Tuple2<Long, Boolean>(vertex.getId(), true); + } } - private static class GraphProgs { - - @SuppressWarnings("serial") - public static String runProgram(int progId, String resultPath) throws Exception { - - switch (progId) { - case 1: { - /* - * Test joinWithVertices with the input DataSet parameter identical - * to the vertex DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices() - .map(new MapFunction<Vertex<Long, Long>, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Vertex<Long, Long> vertex) throws Exception { - return new Tuple2<Long, Long>(vertex.getId(), vertex.getValue()); - } - }), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - }); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - return "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,8\n" + - "5,10\n"; - } - case 2: { - /* - * Test joinWithVertices with the input DataSet passed as a parameter containing - * less elements than the vertex DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) - .map(new MapFunction<Vertex<Long, Long>, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Vertex<Long, Long> vertex) throws Exception { - return new Tuple2<Long, Long>(vertex.getId(), vertex.getValue()); - } - }), - new MapFunction<Tuple2<Long, Long>, Long>() { - - @Override - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - }); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - return "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,4\n" + - "5,5\n"; - } - case 3: { - /* - * Test joinWithVertices with the input DataSet passed as a parameter containing - * less elements than the vertex DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) - .map(new MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>>() { - @Override - public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception { - return new Tuple2<Long, Boolean>(vertex.getId(), true); - } - }), - new MapFunction<Tuple2<Long, Boolean>, Long>() { - - @Override - public Long map(Tuple2<Long, Boolean> tuple) throws Exception { - if(tuple.f1) { - return tuple.f0 * 2; - } - else { - return tuple.f0; - } - } - }); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - return "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,4\n" + - "5,5\n"; - } - case 4: { - /* - * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), - new MapFunction<Tuple2<Long, Long>, Long>() { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f1; - } - }); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - return "1,10\n" + - "2,20\n" + - "3,30\n" + - "4,40\n" + - "5,5\n"; - } - case 5: { - /* - * Test joinWithVertices with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), - new MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long>() { - public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { - return (long) tuple.f1.getIntField(); - } - }); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - return "1,10\n" + - "2,20\n" + - "3,30\n" + - "4,40\n" + - "5,5\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); + @SuppressWarnings("serial") + private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if(tuple.f1) { + return tuple.f0 * 2; + } + else { + return tuple.f0; } } } + + @SuppressWarnings("serial") + private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { + public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java index bcba7e7..9eccecc 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java @@ -18,211 +18,206 @@ package org.apache.flink.graph.test; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public class TestMapEdges extends JavaProgramTestBase { +public class TestMapEdges extends MultipleProgramsTestBase { - private static int NUM_PROGRAMS = 5; - - private int curProgId = config.getInteger("ProgramId", -1); - private String resultPath; - private String expectedResult; - - public TestMapEdges(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + public TestMapEdges(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); } - @Override - protected void testProgram() throws Exception { - expectedResult = GraphProgs.runProgram(curProgId, resultPath); + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - - @Override - protected void postSubmit() throws Exception { + + @After + public void after() throws Exception{ compareResultsByLinesInMemory(expectedResult, resultPath); } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } + @Test + public void testWithSameValue() throws Exception { + /* + * Test mapEdges() keeping the same value type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - return toParameterList(tConfigs); - } - - private static class GraphProgs { + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - @SuppressWarnings("serial") - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - /* - * Test mapEdges() keeping the same value type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, Long>() { - public Long map(Edge<Long, Long> edge) throws Exception { - return edge.getValue()+1; - } - }).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - return "1,2,13\n" + + DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,13\n" + "1,3,14\n" + "2,3,24\n" + "3,4,35\n" + "3,5,36\n" + "4,5,46\n" + "5,1,52\n"; - } - case 2: { - /* - * Test mapEdges() and change the value type to String - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, String>() { - public String map(Edge<Long, Long> edge) throws Exception { - return String.format("string(%d)", edge.getValue()); - } - }).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - return "1,2,string(12)\n" + + } + + @Test + public void testWithStringValue() throws Exception { + /* + * Test mapEdges() and change the value type to String + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,string(12)\n" + "1,3,string(13)\n" + "2,3,string(23)\n" + "3,4,string(34)\n" + "3,5,string(35)\n" + "4,5,string(45)\n" + "5,1,string(51)\n"; - } - case 3: { - /* - * Test mapEdges() and change the value type to a Tuple1 - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, - Tuple1<Long>>() { - public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception { - Tuple1<Long> tupleValue = new Tuple1<Long>(); - tupleValue.setFields(edge.getValue()); - return tupleValue; - } - }).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - return "1,2,(12)\n" + + } + + @Test + public void testWithTuple1Type() throws Exception { + /* + * Test mapEdges() and change the value type to a Tuple1 + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(12)\n" + "1,3,(13)\n" + "2,3,(23)\n" + "3,4,(34)\n" + "3,5,(35)\n" + "4,5,(45)\n" + "5,1,(51)\n"; - } - case 4: { - /* - * Test mapEdges() and change the value type to a custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new MapFunction<Edge<Long, Long>, - DummyCustomType>() { - public DummyCustomType map(Edge<Long, Long> edge) throws Exception { - DummyCustomType dummyValue = new DummyCustomType(); - dummyValue.setIntField(edge.getValue().intValue()); - return dummyValue; - } - }).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - return "1,2,(T,12)\n" + - "1,3,(T,13)\n" + - "2,3,(T,23)\n" + - "3,4,(T,34)\n" + - "3,5,(T,35)\n" + - "4,5,(T,45)\n" + - "5,1,(T,51)\n"; - } - case 5: { - /* - * Test mapEdges() and change the value type to a parameterized custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges( - new MapFunction<Edge<Long, Long>, DummyCustomParameterizedType<Double>>() { - public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception { - DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); - dummyValue.setIntField(edge.getValue().intValue()); - dummyValue.setTField(new Double(edge.getValue())); - return dummyValue; - } - }).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - return "1,2,(12.0,12)\n" + - "1,3,(13.0,13)\n" + - "2,3,(23.0,23)\n" + - "3,4,(34.0,34)\n" + - "3,5,(35.0,35)\n" + - "4,5,(45.0,45)\n" + - "5,1,(51.0,51)\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test mapEdges() and change the value type to a custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(T,12)\n" + + "1,3,(T,13)\n" + + "2,3,(T,23)\n" + + "3,4,(T,34)\n" + + "3,5,(T,35)\n" + + "4,5,(T,45)\n" + + "5,1,(T,51)\n"; + } + + @Test + public void testWithParametrizedCustomType() throws Exception { + /* + * Test mapEdges() and change the value type to a parameterized custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges( + new ToCustomParametrizedTypeMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(12.0,12)\n" + + "1,3,(13.0,13)\n" + + "2,3,(23.0,23)\n" + + "3,4,(34.0,34)\n" + + "3,5,(35.0,35)\n" + + "4,5,(45.0,45)\n" + + "5,1,(51.0,51)\n"; + } + + @SuppressWarnings("serial") + private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> { + public Long map(Edge<Long, Long> edge) throws Exception { + return edge.getValue()+1; + } + } + + @SuppressWarnings("serial") + private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> { + public String map(Edge<Long, Long> edge) throws Exception { + return String.format("string(%d)", edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> { + public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception { + Tuple1<Long> tupleValue = new Tuple1<Long>(); + tupleValue.setFields(edge.getValue()); + return tupleValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> { + public DummyCustomType map(Edge<Long, Long> edge) throws Exception { + DummyCustomType dummyValue = new DummyCustomType(); + dummyValue.setIntField(edge.getValue().intValue()); + return dummyValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>, + DummyCustomParameterizedType<Double>> { + + public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception { + DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); + dummyValue.setIntField(edge.getValue().intValue()); + dummyValue.setTField(new Double(edge.getValue())); + return dummyValue; } } -} +} \ No newline at end of file