[FLINK-2561] [gelly] convert existing tests to use collect instead of files; add tests for newly added operations.
Add completeness test: fromCsvReader method is missing. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e0284ef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e0284ef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e0284ef Branch: refs/heads/master Commit: 9e0284efa90561c88b1bc829b800d89e84477caa Parents: 0b4dc06 Author: vasia <[email protected]> Authored: Thu Sep 24 23:31:38 2015 +0200 Committer: vasia <[email protected]> Committed: Tue Sep 29 00:38:20 2015 +0200 ---------------------------------------------------------------------- flink-staging/flink-gelly-scala/pom.xml | 8 +- .../org/apache/flink/graph/scala/Graph.scala | 9 +- .../test/GellyScalaAPICompletenessTest.scala | 45 +++++ .../scala/test/operations/DegreesITCase.scala | 39 ++--- .../test/operations/GraphMutationsITCase.scala | 165 ++++++++++++++----- .../test/operations/GraphOperationsITCase.scala | 151 +++++++++++------ .../test/operations/JoinWithEdgesITCase.scala | 45 ++--- .../operations/JoinWithVerticesITCase.scala | 29 +--- .../scala/test/operations/MapEdgesITCase.scala | 33 +--- .../test/operations/MapVerticesITCase.scala | 33 +--- .../operations/ReduceOnEdgesMethodsITCase.scala | 63 +++---- .../ReduceOnNeighborMethodsITCase.scala | 46 ++---- 12 files changed, 355 insertions(+), 311 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/pom.xml b/flink-staging/flink-gelly-scala/pom.xml index a1f0da7..edcb865 100644 --- a/flink-staging/flink-gelly-scala/pom.xml +++ b/flink-staging/flink-gelly-scala/pom.xml @@ -48,7 +48,13 @@ under the License. <artifactId>flink-gelly</artifactId> <version>${project.version}</version> </dependency> - + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index ed58ffd..35af1ed 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -100,7 +100,8 @@ object Graph { env: ExecutionEnvironment): Graph[K, VV, EV] = { val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet - wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv)) + wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, + env.getJavaEnv)) } /** @@ -678,7 +679,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { /** * Adds the given list edges to the graph. * - * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored. + * When adding an edge for a non-existing set of vertices, + * the edge is considered invalid and ignored. * * @param newEdges the data set of edges to be added * @return a new graph containing the existing edges plus the newly added edges. @@ -757,7 +759,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { /** * Performs Difference on the vertex and edge sets of the input graphs - * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed + * removes common vertices and edges. If a source/target vertex is removed, + * its corresponding edge will also be removed * @param graph the graph to perform difference with * @return a new graph where the common vertices and edges have been removed */ http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala new file mode 100644 index 0000000..c63c4f8 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala + +import java.lang.reflect.Method +import org.apache.flink.graph.scala._ +import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase +import org.apache.flink.graph.{Graph => JavaGraph} +import scala.language.existentials +import org.junit.Test + +/** + * This checks whether the Gelly Scala API is up to feature parity with the Java API. + * Implements the {@link ScalaAPICompletenessTest} for Gelly. + */ +class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { + + override def isExcludedByName(method: Method): Boolean = { + val name = method.getDeclaringClass.getName + "." + method.getName + val excludedNames = Seq("org.apache.flink.graph.Graph.getContext", + // NOTE: until fromCsvReader() is added to to the Scala API Graph + "org.apache.flink.graph.Graph.fromCsvReader") + excludedNames.contains(name) + } + + @Test + override def testCompleteness(): Unit = { + checkMethods("Graph", "Graph", classOf[JavaGraph[_, _, _]], classOf[Graph[_, _, _]]) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala index 98dbbe9..6196f99 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala @@ -26,42 +26,23 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testInDegrees { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.inDegrees().writeAsCsv(resultPath) - env.execute - expectedResult = "1,1\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,2\n" + val res = graph.inDegrees().collect.toList + expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -70,9 +51,9 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.outDegrees().writeAsCsv(resultPath) - env.execute - expectedResult = "1,2\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,1\n" + val res = graph.outDegrees().collect.toList + expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -81,8 +62,8 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.getDegrees().writeAsCsv(resultPath) - env.execute - expectedResult = "1,3\n" + "2,2\n" + "3,4\n" + "4,2\n" + "5,3\n" + val res = graph.getDegrees().collect.toList + expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala index 687b0a7..3cb92c4 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala @@ -27,33 +27,14 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testAddVertex { @@ -62,9 +43,9 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L)) - newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) - env.execute + val res = newgraph.getVertices.collect().toList expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -74,9 +55,9 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L)) - newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) - env.execute + val res = newgraph.getVertices.collect().toList expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -86,9 +67,37 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L)) - newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) - env.execute + val res = newgraph.getVertices.collect().toList + expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testAddVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + + val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](6L, 6L), + new Vertex[Long, Long](7L, 7L))) + val res = newgraph.getVertices.collect().toList + expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + "7,7\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testAddVerticesExisting { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + + val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](5L, 5L), + new Vertex[Long, Long](6L, 6L))) + val res = newgraph.getVertices.collect().toList expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -98,9 +107,9 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -110,10 +119,36 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testRemoveVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L), + new Vertex[Long, Long](2L, 2L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testRemoveValidAndInvalidVertex { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L), + new Vertex[Long, Long](6L, 6L))) + val res = newgraph.getEdges.collect.toList + expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -124,10 +159,38 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L, 1L), 61L) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + "6,1,61\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testAddEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(2L, 4L, 24L), + new Edge(4L, 1L, 41L), new Edge(4L, 3L, 43L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "2,4,24\n" + "3,4,34\n" + "3,5," + + "35\n" + "4,1,41\n" + "4,3,43\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testAddEdgesInvalidVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(6L, 1L, 61L), + new Edge(7L, 8L, 78L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," + + "35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -138,10 +201,10 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L, 2L), 12L) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," + "35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -151,9 +214,9 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -163,9 +226,35 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testRemoveEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L), + new Edge(4L, 5L, 45L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testRemoveSameEdgeTwiceEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L), + new Edge(1L, 2L, 12L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala index 713eb8d..7f7ebc0 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala @@ -27,44 +27,26 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - + @Test @throws(classOf[Exception]) def testUndirected { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.getUndirected().getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = graph.getUndirected.getEdges.collect().toList; + expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," + "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" + "5,1,51\n" + "1,5,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -73,10 +55,11 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.reverse().getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = graph.reverse().getEdges.collect().toList; + expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," + "45\n" + "1,5,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -85,7 +68,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.subgraph(new FilterFunction[Vertex[Long, Long]] { + val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] { @throws(classOf[Exception]) def filter(vertex: Vertex[Long, Long]): Boolean = { return (vertex.getValue > 2) @@ -96,9 +79,10 @@ MultipleProgramsTestBase(mode) { override def filter(edge: Edge[Long, Long]): Boolean = { return (edge.getValue > 34) } - }).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + }).getEdges.collect().toList; + expectedResult = "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -107,12 +91,13 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.subgraph( + val res = graph.subgraph( vertex => vertex.getValue > 2, edge => edge.getValue > 34 - ).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + ).getEdges.collect().toList; + expectedResult = "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -121,14 +106,15 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] { + val res = graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] { @throws(classOf[Exception]) def filter(vertex: Vertex[Long, Long]): Boolean = { vertex.getValue > 2 } - }).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + }).getEdges.collect().toList; + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -137,11 +123,12 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.filterOnVertices( + val res = graph.filterOnVertices( vertex => vertex.getValue > 2 - ).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + ).getEdges.collect().toList; + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -150,14 +137,15 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] { + val res = graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] { @throws(classOf[Exception]) def filter(edge: Edge[Long, Long]): Boolean = { edge.getValue > 34 } - }).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + }).getEdges.collect().toList; + expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -166,11 +154,12 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.filterOnEdges( + val res = graph.filterOnEdges( edge => edge.getValue > 34 - ).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + ).getEdges.collect().toList; + expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -179,9 +168,9 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - env.fromElements(graph.numberOfVertices).writeAsText(resultPath) - env.execute + val res = env.fromElements(graph.numberOfVertices).collect().toList expectedResult = "5" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -190,9 +179,9 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - env.fromElements(graph.numberOfEdges).writeAsText(resultPath) - env.execute + val res = env.fromElements(graph.numberOfEdges).collect().toList expectedResult = "7" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -201,9 +190,9 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.getVertexIds.writeAsText(resultPath) - env.execute + val res = graph.getVertexIds.collect().toList expectedResult = "1\n2\n3\n4\n5\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -212,9 +201,10 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.getEdgeIds.writeAsCsv(resultPath) - env.execute - expectedResult = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1\n" + val res = graph.getEdgeIds.collect().toList + expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" + + "(5,1)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -231,9 +221,62 @@ MultipleProgramsTestBase(mode) { ) val newgraph = graph.union(Graph.fromCollection(vertices, edges, env)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + "6,1,61\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testDifference { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]]( + new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](3L, 3L), + new Vertex[Long, Long](6L, 6L) + ) + val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]]( + new Edge[Long, Long](1L, 3L, 13L), new Edge[Long, Long](1L, 6L, 16L), + new Edge[Long, Long](6L, 3L, 63L) + ) + + val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env)) + val res = newgraph.getEdges.collect().toList + expectedResult = "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testDifferenceNoCommonVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]]( + new Vertex[Long, Long](6L, 6L) + ) + val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]]( + new Edge[Long, Long](6L, 6L, 66L) + ) + + val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env)) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + + "45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testTriplets { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val res = graph.getTriplets.collect().toList + expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" + + "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala index e19463e..eae8bd5 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala @@ -29,33 +29,14 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testWithEdgesInputDataset { @@ -64,10 +45,10 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new EdgeToTuple3Map[Long, Long]), new AddValuesMapper) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "90\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -79,10 +60,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "90\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -94,10 +75,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + "90\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -109,10 +90,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + "90\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -124,10 +105,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "80\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -139,10 +120,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "80\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala index 4b8f354..8d18d58 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala @@ -28,33 +28,14 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testJoinWithVertexSet { @@ -63,9 +44,9 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new VertexToTuple2Map[Long, Long]), new AddValuesMapper) - result.getVerticesAsTuple2().writeAsCsv(resultPath) - env.execute + val res = result.getVertices.collect.toList expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -77,9 +58,9 @@ MultipleProgramsTestBase(mode) { val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long]) val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet, (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue) - result.getVerticesAsTuple2().writeAsCsv(resultPath) - env.execute + val res = result.getVertices.collect.toList expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala index 7e5ad14..0fa8d2b 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala @@ -18,7 +18,6 @@ package org.apache.flink.graph.scala.test.operations - import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.scala._ import org.apache.flink.graph.Edge @@ -29,42 +28,21 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testWithSameValue { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.mapEdges(new AddOneMapper) - .getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = graph.mapEdges(new AddOneMapper).getEdges.collect.toList expectedResult = "1,2,13\n" + "1,3,14\n" + "" + "2,3,24\n" + @@ -72,6 +50,7 @@ MultipleProgramsTestBase(mode) { "3,5,36\n" + "4,5,46\n" + "5,1,52\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -80,9 +59,8 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.mapEdges(edge => edge.getValue + 1) - .getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = graph.mapEdges(edge => edge.getValue + 1) + .getEdges.collect.toList expectedResult = "1,2,13\n" + "1,3,14\n" + "" + "2,3,24\n" + @@ -90,6 +68,7 @@ MultipleProgramsTestBase(mode) { "3,5,36\n" + "4,5,46\n" + "5,1,52\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] { http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala index a22cfbd..c1ab3ea 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala @@ -28,48 +28,27 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testWithSameValue { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.mapVertices(new AddOneMapper) - .getVerticesAsTuple2().writeAsCsv(resultPath) - env.execute - + val res = graph.mapVertices(new AddOneMapper).getVertices.collect.toList expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + "4,5\n" + "5,6\n"; + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -78,15 +57,13 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.mapVertices(vertex => vertex.getValue + 1) - .getVerticesAsTuple2().writeAsCsv(resultPath) - env.execute - + val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect.toList expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + "4,5\n" + "5,6\n"; + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] { http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala index 6ed383a..695f74a 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala @@ -28,46 +28,24 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testAllNeighborsWithValueGreaterThanFour { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour, - EdgeDirection.ALL) - result.writeAsCsv(resultPath) - env.execute - - - expectedResult = "5,1\n" + "5,3\n" + "5,4" + val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour, + EdgeDirection.ALL).collect.toList + expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -77,13 +55,12 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL) - result.writeAsCsv(resultPath) - env.execute - - - expectedResult = "1,2\n" + "1,3\n" + "1,5\n" + "2,1\n" + "2,3\n" + "3,1\n" + "3,2\n" + - "3,4\n" + "3,5\n" + "4,3\n" + "4,5\n" + "5,1\n" + "5,3\n" + "5,4" + val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL) + .collect.toList + expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" + + "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" + + "(5,1)\n" + "(5,3)\n" + "(5,4)" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -94,9 +71,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue, EdgeDirection.OUT) - verticesWithLowestOutNeighbor.writeAsCsv(resultPath) - env.execute - expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n" + val res = verticesWithLowestOutNeighbor.collect.toList + expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -107,9 +84,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue, EdgeDirection.IN) - verticesWithLowestOutNeighbor.writeAsCsv(resultPath) - env.execute - expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n" + val res = verticesWithLowestOutNeighbor.collect.toList + expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -120,9 +97,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue, EdgeDirection.ALL) - verticesWithMaxEdgeWeight.writeAsCsv(resultPath) - env.execute - expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n" + val res = verticesWithMaxEdgeWeight.collect.toList + expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long, http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala index 52e6d7a..b01e750 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala @@ -28,42 +28,24 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testSumOfAllNeighborsNoValue { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL).writeAsCsv(resultPath) - env.execute - expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n" + val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL) + .collect.toList + expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -72,9 +54,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).writeAsCsv(resultPath) - env.execute - expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n" + val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect.toList + expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -84,9 +66,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL) - result.writeAsCsv(resultPath) - env.execute - expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n" + val res = result.collect.toList + expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -97,9 +79,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result = graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN) - result.writeAsCsv(resultPath) - env.execute - expectedResult = "3,59\n" + "3,118\n" + "4,204\n" + "4,102\n" + "5,570\n" + "5,285" + val res = result.collect.toList + expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } final class SumNeighbors extends ReduceNeighborsFunction[Long] {
