Repository: flink Updated Branches: refs/heads/master 3465580e7 -> 641a0d436
[FLINK-3623] [runtime] Adjust MurmurHash Algorithm Fix "hash *= n" to be "hash = hash * m + n". This closes #1806 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/641a0d43 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/641a0d43 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/641a0d43 Branch: refs/heads/master Commit: 641a0d436c9b7a34ff33ceb370cf29962cac4dee Parents: 6593e48 Author: Greg Hogan <[email protected]> Authored: Wed Mar 16 14:18:03 2016 -0400 Committer: Stephan Ewen <[email protected]> Committed: Mon Apr 4 21:31:49 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/storm/tests/StormFieldsGroupingITCase.java | 6 +++--- .../src/main/java/org/apache/flink/runtime/util/MathUtils.java | 2 +- .../apache/flink/streaming/api/StreamingOperatorsITCase.java | 3 ++- .../flink/streaming/api/scala/StreamingOperatorsITCase.scala | 3 ++- 4 files changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/641a0d43/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index b873345..1875ecb 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -49,9 +49,9 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory("3> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" + - "3> 1431162155\n" + "4> -1557280266\n" + "4> -1728529858\n" + "4> 1654374947\n" + - "4> -65105105\n" + "4> -518907128\n" + "4> -252332814\n", this.resultPath); + compareResultsByLinesInMemory("4> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" + + "4> 1431162155\n" + "3> -1557280266\n" + "4> -1728529858\n" + "3> 1654374947\n" + + "3> -65105105\n" + "3> -518907128\n" + "4> -252332814\n", this.resultPath); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/641a0d43/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java index 2acc55c..5d26186 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java @@ -147,7 +147,7 @@ public final class MathUtils { code *= 0x1b873593; code = Integer.rotateLeft(code, 13); - code *= 0xe6546b64; + code = code * 5 + 0xe6546b64; code ^= 4; code ^= code >>> 16; http://git-wip-us.apache.org/repos/asf/flink/blob/641a0d43/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java index 9530d09..a77eddd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java @@ -222,7 +222,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase @Override public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { for (int i = 0; i < numElements; i++) { - Tuple2<Integer, Integer> result = new Tuple2<>(MathUtils.murmurHash(i) % numKeys, i); + // keys '1' and '2' hash to different buckets + Tuple2<Integer, Integer> result = new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i); ctx.collect(result); } } http://git-wip-us.apache.org/repos/asf/flink/blob/641a0d43/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index e573fe0..0988e41 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -74,7 +74,8 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { override def run(ctx: SourceContext[(Int, Int)]): Unit = { 0 until numElements foreach { - i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i)) + // keys '1' and '2' hash to different buckets + i => ctx.collect((1 + (MathUtils.murmurHash(i)) % numKeys, i)) } }
