[FLINK-2716] [gelly] [apis] New checksum method on DataSet and Graph

This closes #1462


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8be52a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8be52a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8be52a3

Branch: refs/heads/master
Commit: a8be52a3309c5e68fe865a421fb60fdb3cc56d16
Parents: 98fad04
Author: Greg Hogan <c...@greghogan.com>
Authored: Tue Dec 15 14:25:54 2015 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 15 18:58:57 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/Utils.java   | 93 ++++++++++++++++++++
 .../flink/api/java/utils/DataSetUtils.java      | 21 +++++
 .../flink/graph/scala/utils/package.scala       | 50 +++++++++++
 .../scala/test/util/GraphUtilsITCase.scala      | 46 ++++++++++
 .../apache/flink/graph/utils/GraphUtils.java    | 38 ++++++++
 .../flink/graph/test/util/GraphUtilsITCase.java | 58 ++++++++++++
 .../apache/flink/api/scala/utils/package.scala  | 21 +++++
 .../flink/test/util/DataSetUtilsITCase.java     | 13 +++
 .../api/scala/util/DataSetUtilsITCase.scala     | 12 +++
 9 files changed, 352 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 038b58c..2edc533 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -20,7 +20,9 @@ package org.apache.flink.api.java;
 
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.accumulators.SimpleAccumulator;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
@@ -158,6 +160,97 @@ public final class Utils {
                }
        }
 
+       public static class ChecksumHashCode implements 
SimpleAccumulator<ChecksumHashCode> {
+
+               private static final long serialVersionUID = 1L;
+
+               private long count;
+               private long checksum;
+
+               public ChecksumHashCode() {}
+
+               public ChecksumHashCode(long count, long checksum) {
+                       this.count = count;
+                       this.checksum = checksum;
+               }
+
+               public long getCount() {
+                       return count;
+               }
+
+               public long getChecksum() {
+                       return checksum;
+               }
+
+               @Override
+               public void add(ChecksumHashCode value) {
+                       this.count += value.count;
+                       this.checksum += value.checksum;
+               }
+
+               @Override
+               public ChecksumHashCode getLocalValue() {
+                       return this;
+               }
+
+               @Override
+               public void resetLocal() {
+                       this.count = 0;
+                       this.checksum = 0;
+               }
+
+               @Override
+               public void merge(Accumulator<ChecksumHashCode, 
ChecksumHashCode> other) {
+                       this.add(other.getLocalValue());
+               }
+
+               @Override
+               public ChecksumHashCode clone() {
+                       return new ChecksumHashCode(count, checksum);
+               }
+
+               @Override
+               public String toString() {
+                       return "ChecksumHashCode " + this.checksum + ", count " 
+ this.count;
+               }
+       }
+
+       @SkipCodeAnalysis
+       public static class ChecksumHashCodeHelper<T> extends 
RichOutputFormat<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final String id;
+               private long counter;
+               private long checksum;
+
+               public ChecksumHashCodeHelper(String id) {
+                       this.id = id;
+                       this.counter = 0L;
+                       this.checksum = 0L;
+               }
+
+               @Override
+               public void configure(Configuration parameters) {}
+
+               @Override
+               public void open(int taskNumber, int numTasks) {}
+
+               @Override
+               public void writeRecord(T record) throws IOException {
+                       counter++;
+                       // convert 32-bit integer to non-negative long
+                       checksum += record.hashCode() & 0xffffffffL;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       ChecksumHashCode update = new ChecksumHashCode(counter, 
checksum);
+                       getRuntimeContext().addAccumulator(id, update);
+               }
+       }
+
+
        // 
--------------------------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
index 9a1e952..01d801e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.utils;
 
 import com.google.common.collect.Lists;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.java.DataSet;
@@ -30,6 +31,7 @@ import 
org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Collector;
 
 import java.util.Collections;
@@ -246,6 +248,25 @@ public final class DataSetUtils {
                return new GroupReduceOperator<>(mapPartitionOperator, 
input.getType(), sampleInCoordinator, callLocation);
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Checksum
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Convenience method to get the count (number of elements) of a DataSet
+        * as well as the checksum (sum over element hashes).
+        *
+        * @return A ChecksumHashCode that represents the count and checksum of 
elements in the data set.
+        */
+       public static <T> Utils.ChecksumHashCode checksumHashCode(DataSet<T> 
input) throws Exception {
+               final String id = new AbstractID().toString();
+
+               input.output(new 
Utils.ChecksumHashCodeHelper<T>(id)).name("ChecksumHashCode");
+
+               JobExecutionResult res = 
input.getExecutionEnvironment().execute();
+               return res.<Utils.ChecksumHashCode> getAccumulatorResult(id);
+       }
+
 
        // 
*************************************************************************
        //     UTIL METHODS

http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
new file mode 100644
index 0000000..954e6c3
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.Utils.ChecksumHashCode
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils._
+
+import scala.reflect.ClassTag
+
+package object utils {
+  /**
+    * This class provides utility methods for computing checksums over a Graph.
+    *
+    * @param self Graph
+    */
+  implicit class GraphUtils[K: TypeInformation : ClassTag, VV: TypeInformation 
: ClassTag, EV:
+  TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {
+
+    /**
+      * Computes the ChecksumHashCode over the Graph.
+      *
+      * @return the ChecksumHashCode over the vertices and edges.
+      */
+    @throws(classOf[Exception])
+    def checksumHashCode(): ChecksumHashCode = {
+      val checksum: ChecksumHashCode = self.getVertices.checksumHashCode()
+      checksum.add(self.getEdges checksumHashCode())
+      checksum
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala
 
b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala
new file mode 100644
index 0000000..c6d3c58
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.util
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.utils._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class GraphUtilsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) 
extends
+MultipleProgramsTestBase(mode) {
+
+  @Test
+  @throws(classOf[Exception])
+  def testChecksumHashCodeVerticesAndEdges() {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), 
env)
+    val checksum = graph.checksumHashCode()
+    assertEquals(checksum.getCount, 12L)
+    assertEquals(checksum.getChecksum, 19665L)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
new file mode 100644
index 0000000..e7d3a16
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+
+public class GraphUtils {
+
+       /**
+        * Computes the checksum over the Graph
+        *
+        * @return the checksum over the vertices and edges.
+        */
+       public static Utils.ChecksumHashCode checksumHashCode(Graph graph) 
throws Exception {
+               ChecksumHashCode checksum = 
DataSetUtils.checksumHashCode(graph.getVertices());
+               checksum.add(DataSetUtils.checksumHashCode(graph.getEdges()));
+               return checksum;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java
new file mode 100644
index 0000000..51602bc
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphUtilsITCase extends MultipleProgramsTestBase {
+
+       public GraphUtilsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testChecksumHashCodeVerticesAndEdges() throws Exception {
+               /*
+               * Test checksum hashcode
+               */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(
+                       TestGraphUtils.getLongLongVertexData(env),
+                       TestGraphUtils.getLongLongEdgeData(env),
+                       env);
+
+               ChecksumHashCode checksum = GraphUtils.checksumHashCode(graph);
+
+               assertEquals(checksum.getCount(), 12L);
+               assertEquals(checksum.getChecksum(), 19665L);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index 82d4394..d11cf8c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -20,7 +20,9 @@ package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.Utils
+import org.apache.flink.api.java.Utils.ChecksumHashCode
 import org.apache.flink.api.java.utils.{DataSetUtils => jutils}
+import org.apache.flink.util.AbstractID
 
 import _root_.scala.language.implicitConversions
 import _root_.scala.reflect.ClassTag
@@ -103,6 +105,25 @@ package object utils {
       : DataSet[T] = {
       wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSamples, 
seed))
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Checksum
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+      * Convenience method to get the count (number of elements) of a DataSet
+      * as well as the checksum (sum over element hashes).
+      *
+      * @return A ChecksumHashCode with the count and checksum of elements in 
the data set.
+      *
+      * @see [[org.apache.flink.api.java.Utils.ChecksumHashCodeHelper]]
+      */
+    def checksumHashCode(): ChecksumHashCode = {
+      val id = new AbstractID().toString
+      self.javaSet.output(new Utils.ChecksumHashCodeHelper[T](id))
+      val res = self.javaSet.getExecutionEnvironment.execute()
+      res.getAccumulatorResult[ChecksumHashCode](id)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index 478354a..4ccc6e2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -23,8 +23,10 @@ import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -81,4 +83,15 @@ public class DataSetUtilsITCase extends 
MultipleProgramsTestBase {
 
                Assert.assertEquals(expectedSize, result.size());
        }
+
+       @Test
+       public void testIntegerDataSetChecksumHashCode() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
+
+               Utils.ChecksumHashCode checksum = 
DataSetUtils.checksumHashCode(ds);
+               Assert.assertEquals(checksum.getCount(), 15);
+               Assert.assertEquals(checksum.getChecksum(), 55);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a8be52a3/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
index 7fff8ff..25ecc9c 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.util
 
+import org.apache.flink.api.java.Utils.ChecksumHashCode
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.utils._
 import org.apache.flink.test.util.MultipleProgramsTestBase
@@ -61,4 +62,15 @@ class DataSetUtilsITCase (
 
     Assert.assertEquals(expectedSize, result.size)
   }
+
+  @Test
+  def testIntegerDataSetChecksumHashCode(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = CollectionDataSets.getIntDataSet(env)
+
+    val checksum: ChecksumHashCode = ds.checksumHashCode()
+    Assert.assertEquals(checksum.getCount, 15)
+    Assert.assertEquals(checksum.getChecksum, 55)
+  }
 }

Reply via email to