Repository: flink Updated Branches: refs/heads/release-1.1 8a1fe66e8 -> 0274f9c68
Revert "[FLINK-4154] [core] Correction of murmur hash breaks backwards compatibility" This reverts commit 81cf2296683a473db4061dd3bed0aeb249e05058. We had an incorrent implementation of Murmur hash in Flink 1.0. This was fixed in 641a0d4 for Flink 1.1. Then we thought that we need to revert this in order to ensure backwards compatability between Flink 1.0 and 1.1 savepoints (81cf22). Turns out, savepoint backwards compatability is broken for other reasons, too. Therefore, we revert 81cf22 here, ending up with a correct implementation of Murmur hash again. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0274f9c6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0274f9c6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0274f9c6 Branch: refs/heads/release-1.1 Commit: 0274f9c686e4640076b5358e9b4b50c4844863cb Parents: 8a1fe66 Author: Ufuk Celebi <[email protected]> Authored: Mon Aug 1 17:57:18 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Mon Aug 1 17:57:27 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/storm/tests/StormFieldsGroupingITCase.java | 6 +++--- flink-core/src/main/java/org/apache/flink/util/MathUtils.java | 5 +---- .../flink/streaming/api/scala/StreamingOperatorsITCase.scala | 3 ++- .../flink/test/streaming/api/StreamingOperatorsITCase.java | 3 ++- 4 files changed, 8 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0274f9c6/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index a0121c7..b43b24d 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -49,9 +49,9 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory("3> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" + - "3> 1431162155\n" + "4> -1557280266\n" + "4> -1728529858\n" + "4> 1654374947\n" + - "4> -65105105\n" + "4> -518907128\n" + "4> -252332814\n", this.resultPath); + compareResultsByLinesInMemory("4> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" + + "4> 1431162155\n" + "3> -1557280266\n" + "4> -1728529858\n" + "3> 1654374947\n" + + "3> -65105105\n" + "3> -518907128\n" + "4> -252332814\n", this.resultPath); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0274f9c6/flink-core/src/main/java/org/apache/flink/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java index e9d9df7..f40c83a 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java @@ -135,10 +135,7 @@ public final class MathUtils { code *= 0x1b873593; code = Integer.rotateLeft(code, 13); - // By the MurmurHash algorithm the following should be "code = code * 5 + 0xe6546b64;" (see FLINK-3623) - // but correcting the algorithm is a breaking change (see FLINK-4154). The effect of the resulting skew - // increases with increased parallelism (see FLINK-4154). - code *= 0xe6546b64; + code = code * 5 + 0xe6546b64; code ^= 4; code ^= code >>> 16; http://git-wip-us.apache.org/repos/asf/flink/blob/0274f9c6/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index 5579797..d353468 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -74,7 +74,8 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { override def run(ctx: SourceContext[(Int, Int)]): Unit = { 0 until numElements foreach { - i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i)) + // keys '1' and '2' hash to different buckets + i => ctx.collect((1 + (MathUtils.murmurHash(i)) % numKeys, i)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/0274f9c6/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java index 0c0db08..5d99de4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java @@ -223,7 +223,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase @Override public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { for (int i = 0; i < numElements; i++) { - Tuple2<Integer, Integer> result = new Tuple2<>(MathUtils.murmurHash(i) % numKeys, i); + // keys '1' and '2' hash to different buckets + Tuple2<Integer, Integer> result = new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i); ctx.collect(result); } }
