Repository: flink Updated Branches: refs/heads/master 80ae75053 -> f0f93c276
[FLINK-3422][streaming][api-breaking] Scramble HashPartitioner hashes. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ff286d3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ff286d3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ff286d3 Branch: refs/heads/master Commit: 0ff286d3b793cdc3da721cce46366e130195ffb0 Parents: 80ae750 Author: Gabor Horvath <[email protected]> Authored: Sun Feb 21 14:54:44 2016 +0100 Committer: Márton Balassi <[email protected]> Committed: Wed Mar 2 17:28:34 2016 +0100 ---------------------------------------------------------------------- .../operators/hash/CompactingHashTable.java | 29 ++-------- .../operators/shipping/OutputEmitter.java | 31 +--------- .../apache/flink/runtime/util/MathUtils.java | 59 +++++++++++++++++++- .../runtime/partitioner/HashPartitioner.java | 3 +- .../apache/flink/streaming/api/IterateTest.java | 5 +- .../api/scala/StreamingOperatorsITCase.scala | 11 +++- 6 files changed, 78 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ff286d3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index 5a0c6cc..b4d03e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -332,7 +332,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> { return; } - final int hashCode = hash(this.buildSideComparator.hash(record)); + final int hashCode = MathUtils.jenkinsHash(this.buildSideComparator.hash(record)); final int posHashCode = hashCode % this.numBuckets; // get the bucket for the given hash code @@ -360,7 +360,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> { return; } - final int searchHashCode = hash(this.buildSideComparator.hash(record)); + final int searchHashCode = MathUtils.jenkinsHash(this.buildSideComparator.hash(record)); final int posHashCode = searchHashCode % this.numBuckets; // get the bucket for the given hash code @@ -1140,26 +1140,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> { this.compactionMemory.resetRWViews(); this.compactionMemory.pushDownPages(); } - - /** - * This function hashes an integer value. It is adapted from Bob Jenkins' website - * <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>. - * The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed - * affects every bit of the hash value. - * - * @param code The integer to be hashed. - * @return The hash code for the integer. - */ - private static int hash(int code) { - code = (code + 0x7ed55d16) + (code << 12); - code = (code ^ 0xc761c23c) ^ (code >>> 19); - code = (code + 0x165667b1) + (code << 5); - code = (code + 0xd3a2646c) ^ (code << 9); - code = (code + 0xfd7046c5) + (code << 3); - code = (code ^ 0xb55a4f09) ^ (code >>> 16); - return code >= 0 ? code : -(code + 1); - } - + /** * Iterator that traverses the whole hash table once * @@ -1286,7 +1267,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> { if (closed) { return null; } - final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord)); + final int searchHashCode = MathUtils.jenkinsHash(this.probeTypeComparator.hash(probeSideRecord)); final int posHashCode = searchHashCode % numBuckets; @@ -1359,7 +1340,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> { if (closed) { return null; } - final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord)); + final int searchHashCode = MathUtils.jenkinsHash(this.probeTypeComparator.hash(probeSideRecord)); final int posHashCode = searchHashCode % numBuckets; http://git-wip-us.apache.org/repos/asf/flink/blob/0ff286d3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java index 92e3787..fdbcd9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.runtime.io.network.api.writer.ChannelSelector; import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.util.MathUtils; /** * The output emitter decides to which of the possibly multiple output channels a record is sent. @@ -187,39 +188,11 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T private int[] hashPartitionDefault(T record, int numberOfChannels) { int hash = this.comparator.hash(record); - hash = murmurHash(hash); + this.channels[0] = MathUtils.murmurHash(hash) % numberOfChannels; - if (hash >= 0) { - this.channels[0] = hash % numberOfChannels; - } - else if (hash != Integer.MIN_VALUE) { - this.channels[0] = -hash % numberOfChannels; - } - else { - this.channels[0] = 0; - } - return this.channels; } - private int murmurHash(int k) { - k *= 0xcc9e2d51; - k = Integer.rotateLeft(k, 15); - k *= 0x1b873593; - - k = Integer.rotateLeft(k, 13); - k *= 0xe6546b64; - - k ^= 4; - k ^= k >>> 16; - k *= 0x85ebca6b; - k ^= k >>> 13; - k *= 0xc2b2ae35; - k ^= k >>> 16; - - return k; - } - private final int[] rangePartition(final T record, int numberOfChannels) { if (this.channels == null || this.channels.length != 1) { this.channels = new int[1]; http://git-wip-us.apache.org/repos/asf/flink/blob/0ff286d3/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java index 939a2e1..2acc55c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java @@ -109,7 +109,64 @@ public final class MathUtils { public static boolean isPowerOf2(long value) { return (value & (value - 1)) == 0; } - + + /** + * This function hashes an integer value. It is adapted from Bob Jenkins' website + * <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>. + * The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed + * affects every bit of the hash value. + * + * It is crucial to use different hash functions to partition data across machines and the internal partitioning of + * data structures. This hash function is intended for partitioning internally in data structures. + * + * @param code The integer to be hashed. + * @return The non-negative hash code for the integer. + */ + public static int jenkinsHash(int code) { + code = (code + 0x7ed55d16) + (code << 12); + code = (code ^ 0xc761c23c) ^ (code >>> 19); + code = (code + 0x165667b1) + (code << 5); + code = (code + 0xd3a2646c) ^ (code << 9); + code = (code + 0xfd7046c5) + (code << 3); + code = (code ^ 0xb55a4f09) ^ (code >>> 16); + return code >= 0 ? code : -(code + 1); + } + + /** + * This function hashes an integer value. + * + * It is crucial to use different hash functions to partition data across machines and the internal partitioning of + * data structures. This hash function is intended for partitioning across machines. + * + * @param code The integer to be hashed. + * @return The non-negative hash code for the integer. + */ + public static int murmurHash(int code) { + code *= 0xcc9e2d51; + code = Integer.rotateLeft(code, 15); + code *= 0x1b873593; + + code = Integer.rotateLeft(code, 13); + code *= 0xe6546b64; + + code ^= 4; + code ^= code >>> 16; + code *= 0x85ebca6b; + code ^= code >>> 13; + code *= 0xc2b2ae35; + code ^= code >>> 16; + + if (code >= 0) { + return code; + } + else if (code != Integer.MIN_VALUE) { + return -code; + } + else { + return 0; + } + } + // ============================================================================================ /** http://git-wip-us.apache.org/repos/asf/flink/blob/0ff286d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java index ec0cf94..82f0141 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.partitioner; import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** @@ -48,7 +49,7 @@ public class HashPartitioner<T> extends StreamPartitioner<T> { } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } - returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels); + returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels; return returnArray; } http://git-wip-us.apache.org/repos/asf/flink/blob/0ff286d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java index cd73b41..920185a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -488,9 +489,9 @@ public class IterateTest extends StreamingMultipleProgramsTestBase { public void flatMap(Integer value, Collector<Integer> out) throws Exception { received++; if (key == -1) { - key = value % 3; + key = MathUtils.murmurHash(value % 3) % 3; } else { - assertEquals(key, value % 3); + assertEquals(key, MathUtils.murmurHash(value % 3) % 3); } if (value > 0) { out.collect(value - 1); http://git-wip-us.apache.org/repos/asf/flink/blob/0ff286d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index 60a02e7..fe49b1f 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction} import org.apache.flink.core.fs.FileSystem +import org.apache.flink.runtime.util.MathUtils import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.test.util.TestBaseUtils @@ -71,7 +72,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { override def run(ctx: SourceContext[(Int, Int)]): Unit = { 0 until numElements foreach { - i => ctx.collect((i % numKeys, i)) + i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i)) } } @@ -86,8 +87,12 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { } }) .map(new RichMapFunction[Int, (Int, Int)] { + var key: Int = -1 override def map(value: Int): (Int, Int) = { - (getRuntimeContext.getIndexOfThisSubtask, value) + if (key == -1) { + key = MathUtils.murmurHash(value) % numKeys + } + (key, value) } }) .split{ @@ -106,7 +111,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { .javaStream .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE) - val groupedSequence = 0 until numElements groupBy( _ % numKeys) + val groupedSequence = 0 until numElements groupBy( MathUtils.murmurHash(_) % numKeys ) expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n") expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n")
