Repository: flink
Updated Branches:
  refs/heads/master aed56aeeb -> 0f92a6b57


Revert "[FLINK-4154] [core] Correction of murmur hash breaks backwards 
compatibility"

This reverts commit 81cf2296683a473db4061dd3bed0aeb249e05058.

We had an incorrent implementation of Murmur hash in Flink 1.0. This
was fixed in 641a0d4 for Flink 1.1. Then we thought that we need to
revert this in order to ensure backwards compatability between Flink
1.0 and 1.1 savepoints (81cf22). Turns out, savepoint backwards
compatability is broken for other reasons, too. Therefore, we revert
81cf22 here, ending up with a correct implementation of Murmur hash
again.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f92a6b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f92a6b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f92a6b5

Branch: refs/heads/master
Commit: 0f92a6b575cefcb6d5e9e151f6651ff31b126c7c
Parents: aed56ae
Author: Ufuk Celebi <[email protected]>
Authored: Mon Aug 1 17:57:18 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Aug 1 18:02:18 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/storm/tests/StormFieldsGroupingITCase.java    | 6 +++---
 flink-core/src/main/java/org/apache/flink/util/MathUtils.java  | 5 +----
 .../flink/streaming/api/scala/StreamingOperatorsITCase.scala   | 3 ++-
 .../flink/test/streaming/api/StreamingOperatorsITCase.java     | 3 ++-
 4 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f92a6b5/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index a0121c7..b43b24d 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -49,9 +49,9 @@ public class StormFieldsGroupingITCase extends 
StreamingProgramTestBase {
 
        @Override
        protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory("3> -1155484576\n" + "3> 
1033096058\n" + "3> -1930858313\n" +
-                       "3> 1431162155\n" + "4> -1557280266\n" + "4> 
-1728529858\n" + "4> 1654374947\n" +
-                       "4> -65105105\n" + "4> -518907128\n" + "4> 
-252332814\n", this.resultPath);
+               compareResultsByLinesInMemory("4> -1155484576\n" + "3> 
1033096058\n" + "3> -1930858313\n" +
+                       "4> 1431162155\n" + "3> -1557280266\n" + "4> 
-1728529858\n" + "3> 1654374947\n" +
+                       "3> -65105105\n" + "3> -518907128\n" + "4> 
-252332814\n", this.resultPath);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0f92a6b5/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index e9d9df7..f40c83a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -135,10 +135,7 @@ public final class MathUtils {
                code *= 0x1b873593;
 
                code = Integer.rotateLeft(code, 13);
-               // By the MurmurHash algorithm the following should be "code = 
code * 5 + 0xe6546b64;" (see FLINK-3623)
-               // but correcting the algorithm is a breaking change (see 
FLINK-4154). The effect of the resulting skew
-               // increases with increased parallelism (see FLINK-4154).
-               code *= 0xe6546b64;
+               code = code * 5 + 0xe6546b64;
 
                code ^= 4;
                code ^= code >>> 16;

http://git-wip-us.apache.org/repos/asf/flink/blob/0f92a6b5/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index 5579797..d353468 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -74,7 +74,8 @@ class StreamingOperatorsITCase extends 
ScalaStreamingMultipleProgramsTestBase {
 
       override def run(ctx: SourceContext[(Int, Int)]): Unit = {
         0 until numElements foreach {
-          i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i))
+          // keys '1' and '2' hash to different buckets
+          i => ctx.collect((1 + (MathUtils.murmurHash(i)) % numKeys, i))
         }
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f92a6b5/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 0c0db08..5d99de4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -223,7 +223,8 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                @Override
                public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
                        for (int i = 0; i < numElements; i++) {
-                               Tuple2<Integer, Integer> result = new 
Tuple2<>(MathUtils.murmurHash(i) % numKeys, i);
+                               // keys '1' and '2' hash to different buckets
+                               Tuple2<Integer, Integer> result = new 
Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i);
                                ctx.collect(result);
                        }
                }

Reply via email to