[FLINK-4135] [gelly] Replace ChecksumHashCode as GraphAnalytic Adds a GraphAnalytic to replace the checksumHashCode Java and Scala utility functions.
This closes #2188 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0efa6441 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0efa6441 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0efa6441 Branch: refs/heads/master Commit: 0efa6441420ba4a74ecb9a7d70d0a0d80e25e292 Parents: 149e7a0 Author: Greg Hogan <[email protected]> Authored: Thu Jun 30 09:02:47 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Fri Jul 1 14:35:42 2016 -0400 ---------------------------------------------------------------------- .../flink/graph/scala/utils/package.scala | 60 ---------------- .../scala/test/util/GraphUtilsITCase.scala | 46 ------------ .../graph/library/metric/ChecksumHashCode.java | 73 ++++++++++++++++++++ .../apache/flink/graph/utils/GraphUtils.java | 30 -------- .../filter/undirected/MaximumDegreeTest.java | 10 +-- .../library/metric/ChecksumHashCodeTest.java | 48 +++++++++++++ .../flink/graph/test/util/GraphUtilsITCase.java | 58 ---------------- 7 files changed, 127 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala deleted file mode 100644 index fe4ab5b..0000000 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.Utils -import org.apache.flink.graph.{Edge, Vertex} -import org.apache.flink.util.AbstractID - -import scala.reflect.ClassTag - -package object utils { - /** - * This class provides utility methods for computing checksums over a Graph. - * - * @param self Graph - */ - implicit class GraphUtils[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: - TypeInformation : ClassTag](val self: Graph[K, VV, EV]) { - - /** - * Convenience method to get the count (number of elements) of a Graph - * as well as the checksum (sum over element hashes). The vertex and - * edge DataSets are processed in a single job and the resultant counts - * and checksums are merged locally. - * - * @return the checksum over the vertices and edges - */ - @throws(classOf[Exception]) - def checksumHashCode(): Utils.ChecksumHashCode = { - val verticesId = new AbstractID().toString - self.getVertices.output(new Utils.ChecksumHashCodeHelper[Vertex[K,VV]](verticesId)) - - val edgesId = new AbstractID().toString - self.getEdges.output(new Utils.ChecksumHashCodeHelper[Edge[K,EV]](edgesId)) - - val res = self.getWrappedGraph.getContext.execute() - - val checksum = res.getAccumulatorResult[Utils.ChecksumHashCode](verticesId) - checksum.add(res.getAccumulatorResult[Utils.ChecksumHashCode](edgesId)) - checksum - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala deleted file mode 100644 index c6d3c58..0000000 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.test.util - -import org.apache.flink.api.scala._ -import org.apache.flink.graph.scala._ -import org.apache.flink.graph.scala.utils._ -import org.apache.flink.graph.scala.test.TestGraphUtils -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.junit.Assert.assertEquals -import org.junit.Test -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -@RunWith(classOf[Parameterized]) -class GraphUtilsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends -MultipleProgramsTestBase(mode) { - - @Test - @throws(classOf[Exception]) - def testChecksumHashCodeVerticesAndEdges() { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val checksum = graph.checksumHashCode() - assertEquals(checksum.getCount, 12L) - assertEquals(checksum.getChecksum, 19665L) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java new file mode 100644 index 0000000..261e89b --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java @@ -0,0 +1,73 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.graph.library.metric; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.Utils; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.AbstractID; + +/** + * Convenience method to get the count (number of elements) of a Graph + * as well as the checksum (sum over element hashes). The vertex and + * edge DataSets are processed in a single job and the resultant counts + * and checksums are merged locally. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type + */ +public class ChecksumHashCode<K, VV, EV> +extends AbstractGraphAnalytic<K, VV, EV, Utils.ChecksumHashCode> { + + private String verticesId = new AbstractID().toString(); + + private String edgesId = new AbstractID().toString(); + + @Override + public ChecksumHashCode<K, VV, EV> run(Graph<K, VV, EV> input) + throws Exception { + super.run(input); + + input + .getVertices() + .output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId)) + .name("ChecksumHashCode vertices"); + + input + .getEdges() + .output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId)) + .name("ChecksumHashCode edges"); + + return this; + } + + @Override + public Utils.ChecksumHashCode getResult() { + JobExecutionResult res = env.getLastJobExecutionResult(); + Utils.ChecksumHashCode checksum = res.getAccumulatorResult(verticesId); + checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId)); + return checksum; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java index 264479b..0c93fa9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java @@ -18,46 +18,16 @@ package org.apache.flink.graph.utils; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; import org.apache.flink.types.LongValue; -import org.apache.flink.util.AbstractID; import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO; public class GraphUtils { /** - * Convenience method to get the count (number of elements) of a Graph - * as well as the checksum (sum over element hashes). The vertex and - * edge DataSets are processed in a single job and the resultant counts - * and checksums are merged locally. - * - * @param graph Graph over which to compute the count and checksum - * @return the checksum over the vertices and edges - */ - public static <K, VV, EV> Utils.ChecksumHashCode checksumHashCode(Graph<K, VV, EV> graph) throws Exception { - final String verticesId = new AbstractID().toString(); - graph.getVertices().output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId)).name("ChecksumHashCode vertices"); - - final String edgesId = new AbstractID().toString(); - graph.getEdges().output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId)).name("ChecksumHashCode edges"); - - JobExecutionResult res = graph.getContext().execute(); - - Utils.ChecksumHashCode checksum = res.<Utils.ChecksumHashCode>getAccumulatorResult(verticesId); - checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId)); - - return checksum; - } - - /** * Count the number of elements in a DataSet. * * @param input DataSet of elements to be counted http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java index b3a3356..ca96f24 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java @@ -18,10 +18,10 @@ package org.apache.flink.graph.asm.degree.filter.undirected; -import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.Utils; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.AsmTestBase; -import org.apache.flink.graph.utils.GraphUtils; +import org.apache.flink.graph.library.metric.ChecksumHashCode; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -62,8 +62,10 @@ extends AsmTestBase { @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode checksum = GraphUtils.checksumHashCode(undirectedRMatGraph - .run(new MaximumDegree<LongValue, NullValue, NullValue>(16))); + Utils.ChecksumHashCode checksum = undirectedRMatGraph + .run(new MaximumDegree<LongValue, NullValue, NullValue>(16)) + .run(new ChecksumHashCode<LongValue, NullValue, NullValue>()) + .execute(); assertEquals(805, checksum.getCount()); assertEquals(0x0000000008028b43L, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java new file mode 100644 index 0000000..92a5fd0 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java @@ -0,0 +1,48 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.graph.library.metric; + +import org.apache.flink.api.java.Utils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.test.TestGraphUtils; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ChecksumHashCodeTest +extends AsmTestBase { + + @Test + public void testSmallGraph() throws Exception { + Graph<Long, Long, Long> graph = Graph.fromDataSet( + TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), + env); + + Utils.ChecksumHashCode checksum = graph + .run(new ChecksumHashCode<Long, Long, Long>()) + .execute(); + + assertEquals(checksum.getCount(), 12L); + assertEquals(checksum.getChecksum(), 19665L); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java deleted file mode 100644 index 51602bc..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.util; - -import static org.junit.Assert.assertEquals; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.test.TestGraphUtils; -import org.apache.flink.graph.utils.GraphUtils; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class GraphUtilsITCase extends MultipleProgramsTestBase { - - public GraphUtilsITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testChecksumHashCodeVerticesAndEdges() throws Exception { - /* - * Test checksum hashcode - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet( - TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), - env); - - ChecksumHashCode checksum = GraphUtils.checksumHashCode(graph); - - assertEquals(checksum.getCount(), 12L); - assertEquals(checksum.getChecksum(), 19665L); - } - -}
