Repository: flink
Updated Branches:
  refs/heads/release-1.0 23dc2a4ac -> b862fd0b3


[FLINK-3422][streaming][api-breaking] Scramble HashPartitioner hashes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a049d80e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a049d80e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a049d80e

Branch: refs/heads/release-1.0
Commit: a049d80e8aef7f0d23fbc06d263fb3e7a0f2f05f
Parents: 23dc2a4
Author: Gabor Horvath <[email protected]>
Authored: Sun Feb 21 14:54:44 2016 +0100
Committer: Márton Balassi <[email protected]>
Committed: Wed Mar 2 12:47:10 2016 +0100

----------------------------------------------------------------------
 .../operators/hash/CompactingHashTable.java     | 29 ++--------
 .../operators/shipping/OutputEmitter.java       | 31 +---------
 .../apache/flink/runtime/util/MathUtils.java    | 59 +++++++++++++++++++-
 .../runtime/partitioner/HashPartitioner.java    |  3 +-
 .../apache/flink/streaming/api/IterateTest.java |  5 +-
 .../api/scala/StreamingOperatorsITCase.scala    | 11 +++-
 6 files changed, 78 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a049d80e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 5a0c6cc..b4d03e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -332,7 +332,7 @@ public class CompactingHashTable<T> extends 
AbstractMutableHashTable<T> {
                        return;
                }
                
-               final int hashCode = 
hash(this.buildSideComparator.hash(record));
+               final int hashCode = 
MathUtils.jenkinsHash(this.buildSideComparator.hash(record));
                final int posHashCode = hashCode % this.numBuckets;
                
                // get the bucket for the given hash code
@@ -360,7 +360,7 @@ public class CompactingHashTable<T> extends 
AbstractMutableHashTable<T> {
                        return;
                }
                
-               final int searchHashCode = 
hash(this.buildSideComparator.hash(record));
+               final int searchHashCode = 
MathUtils.jenkinsHash(this.buildSideComparator.hash(record));
                final int posHashCode = searchHashCode % this.numBuckets;
                
                // get the bucket for the given hash code
@@ -1140,26 +1140,7 @@ public class CompactingHashTable<T> extends 
AbstractMutableHashTable<T> {
                this.compactionMemory.resetRWViews();
                this.compactionMemory.pushDownPages();
        }
-       
-       /**
-        * This function hashes an integer value. It is adapted from Bob 
Jenkins' website
-        * <a 
href="http://www.burtleburtle.net/bob/hash/integer.html";>http://www.burtleburtle.net/bob/hash/integer.html</a>.
-        * The hash function has the <i>full avalanche</i> property, meaning 
that every bit of the value to be hashed
-        * affects every bit of the hash value. 
-        * 
-        * @param code The integer to be hashed.
-        * @return The hash code for the integer.
-        */
-       private static int hash(int code) {
-               code = (code + 0x7ed55d16) + (code << 12);
-               code = (code ^ 0xc761c23c) ^ (code >>> 19);
-               code = (code + 0x165667b1) + (code << 5);
-               code = (code + 0xd3a2646c) ^ (code << 9);
-               code = (code + 0xfd7046c5) + (code << 3);
-               code = (code ^ 0xb55a4f09) ^ (code >>> 16);
-               return code >= 0 ? code : -(code + 1);
-       }
-       
+
        /**
         * Iterator that traverses the whole hash table once
         * 
@@ -1286,7 +1267,7 @@ public class CompactingHashTable<T> extends 
AbstractMutableHashTable<T> {
                        if (closed) {
                                return null;
                        }
-                       final int searchHashCode = 
hash(this.probeTypeComparator.hash(probeSideRecord));
+                       final int searchHashCode = 
MathUtils.jenkinsHash(this.probeTypeComparator.hash(probeSideRecord));
                        
                        final int posHashCode = searchHashCode % numBuckets;
                        
@@ -1359,7 +1340,7 @@ public class CompactingHashTable<T> extends 
AbstractMutableHashTable<T> {
                        if (closed) {
                                return null;
                        }
-                       final int searchHashCode = 
hash(this.probeTypeComparator.hash(probeSideRecord));
+                       final int searchHashCode = 
MathUtils.jenkinsHash(this.probeTypeComparator.hash(probeSideRecord));
 
                        final int posHashCode = searchHashCode % numBuckets;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a049d80e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
index 92e3787..fdbcd9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.util.MathUtils;
 
 /**
  * The output emitter decides to which of the possibly multiple output 
channels a record is sent.
@@ -187,39 +188,11 @@ public class OutputEmitter<T> implements 
ChannelSelector<SerializationDelegate<T
        private int[] hashPartitionDefault(T record, int numberOfChannels) {
                int hash = this.comparator.hash(record);
 
-               hash = murmurHash(hash);
+               this.channels[0] = MathUtils.murmurHash(hash) % 
numberOfChannels;
 
-               if (hash >= 0) {
-                       this.channels[0] = hash % numberOfChannels;
-               }
-               else if (hash != Integer.MIN_VALUE) {
-                       this.channels[0] = -hash % numberOfChannels;
-               }
-               else {
-                       this.channels[0] = 0;
-               }
-       
                return this.channels;
        }
 
-       private int murmurHash(int k) {
-               k *= 0xcc9e2d51;
-               k = Integer.rotateLeft(k, 15);
-               k *= 0x1b873593;
-               
-               k = Integer.rotateLeft(k, 13);
-               k *= 0xe6546b64;
-
-               k ^= 4;
-               k ^= k >>> 16;
-               k *= 0x85ebca6b;
-               k ^= k >>> 13;
-               k *= 0xc2b2ae35;
-               k ^= k >>> 16;
-
-               return k;
-       }
-
        private final int[] rangePartition(final T record, int 
numberOfChannels) {
                if (this.channels == null || this.channels.length != 1) {
                        this.channels = new int[1];

http://git-wip-us.apache.org/repos/asf/flink/blob/a049d80e/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
index 939a2e1..2acc55c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
@@ -109,7 +109,64 @@ public final class MathUtils {
        public static boolean isPowerOf2(long value) {
                return (value & (value - 1)) == 0;
        }
-       
+
+       /**
+        * This function hashes an integer value. It is adapted from Bob 
Jenkins' website
+        * <a 
href="http://www.burtleburtle.net/bob/hash/integer.html";>http://www.burtleburtle.net/bob/hash/integer.html</a>.
+        * The hash function has the <i>full avalanche</i> property, meaning 
that every bit of the value to be hashed
+        * affects every bit of the hash value.
+        *
+        * It is crucial to use different hash functions to partition data 
across machines and the internal partitioning of
+        * data structures. This hash function is intended for partitioning 
internally in data structures.
+        *
+        * @param code The integer to be hashed.
+        * @return The non-negative hash code for the integer.
+        */
+       public static int jenkinsHash(int code) {
+               code = (code + 0x7ed55d16) + (code << 12);
+               code = (code ^ 0xc761c23c) ^ (code >>> 19);
+               code = (code + 0x165667b1) + (code << 5);
+               code = (code + 0xd3a2646c) ^ (code << 9);
+               code = (code + 0xfd7046c5) + (code << 3);
+               code = (code ^ 0xb55a4f09) ^ (code >>> 16);
+               return code >= 0 ? code : -(code + 1);
+       }
+
+       /**
+        * This function hashes an integer value.
+        *
+        * It is crucial to use different hash functions to partition data 
across machines and the internal partitioning of
+        * data structures. This hash function is intended for partitioning 
across machines.
+        *
+        * @param code The integer to be hashed.
+        * @return The non-negative hash code for the integer.
+        */
+       public static int murmurHash(int code) {
+               code *= 0xcc9e2d51;
+               code = Integer.rotateLeft(code, 15);
+               code *= 0x1b873593;
+
+               code = Integer.rotateLeft(code, 13);
+               code *= 0xe6546b64;
+
+               code ^= 4;
+               code ^= code >>> 16;
+               code *= 0x85ebca6b;
+               code ^= code >>> 13;
+               code *= 0xc2b2ae35;
+               code ^= code >>> 16;
+
+               if (code >= 0) {
+                       return code;
+               }
+               else if (code != Integer.MIN_VALUE) {
+                       return -code;
+               }
+               else {
+                       return 0;
+               }
+       }
+
        // 
============================================================================================
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a049d80e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
index ec0cf94..82f0141 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.partitioner;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
@@ -48,7 +49,7 @@ public class HashPartitioner<T> extends StreamPartitioner<T> {
                } catch (Exception e) {
                        throw new RuntimeException("Could not extract key from 
" + record.getInstance().getValue(), e);
                }
-               returnArray[0] = Math.abs(key.hashCode() % 
numberOfOutputChannels);
+               returnArray[0] = MathUtils.murmurHash(key.hashCode()) % 
numberOfOutputChannels;
 
                return returnArray;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a049d80e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index cd73b41..920185a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -488,9 +489,9 @@ public class IterateTest extends 
StreamingMultipleProgramsTestBase {
                        public void flatMap(Integer value, Collector<Integer> 
out) throws Exception {
                                received++;
                                if (key == -1) {
-                                       key = value % 3;
+                                       key = MathUtils.murmurHash(value % 3) % 
3;
                                } else {
-                                       assertEquals(key, value % 3);
+                                       assertEquals(key, 
MathUtils.murmurHash(value % 3) % 3);
                                }
                                if (value > 0) {
                                        out.collect(value - 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/a049d80e/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index 60a02e7..fe49b1f 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction}
 import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.runtime.util.MathUtils
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.test.util.TestBaseUtils
@@ -71,7 +72,7 @@ class StreamingOperatorsITCase extends 
ScalaStreamingMultipleProgramsTestBase {
 
       override def run(ctx: SourceContext[(Int, Int)]): Unit = {
         0 until numElements foreach {
-          i => ctx.collect((i % numKeys, i))
+          i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i))
         }
       }
 
@@ -86,8 +87,12 @@ class StreamingOperatorsITCase extends 
ScalaStreamingMultipleProgramsTestBase {
         }
       })
       .map(new RichMapFunction[Int, (Int, Int)] {
+        var key: Int = -1
         override def map(value: Int): (Int, Int) = {
-          (getRuntimeContext.getIndexOfThisSubtask, value)
+          if (key == -1) {
+            key = MathUtils.murmurHash(value) % numKeys
+          }
+          (key, value)
         }
       })
       .split{
@@ -106,7 +111,7 @@ class StreamingOperatorsITCase extends 
ScalaStreamingMultipleProgramsTestBase {
       .javaStream
       .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)
 
-    val groupedSequence = 0 until numElements groupBy( _ % numKeys)
+    val groupedSequence = 0 until numElements groupBy( MathUtils.murmurHash(_) 
% numKeys )
 
     expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n")
     expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n")

Reply via email to