[ 
https://issues.apache.org/jira/browse/FLINK-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367950#comment-15367950
 ] 

Greg Hogan commented on FLINK-4154:
-----------------------------------

I wanted to see how much skew this causes in case we should consider fixing the 
algorithm for 2.0. With the proper MurmurHash I see no skew. In the Flink 
version the skew is typically much less than 1% when parallelism is small.

|| channels || min count || max count || relative skew ||
| 16 | 268391180 | 268480808 | 3.3394540014317905E-4 |
| 37 | 116042832 | 116126424 | 7.20354704890346E-4 |
| 64 | 67091700 | 67129376 | 5.615597756503413E-4 |
| 117 | 36686636 | 36742216 | 0.0015149930890365636 |
| 256 | 16767212 | 16786620 | 0.0011574971438304711 |
| 731 | 5857272 | 5890152 | 0.005613534764989572 |
| 1024 | 4186452 | 4202972 | 0.003946062202552424 |
| 1497 | 2860692 | 2878580 | 0.006253032483049556 |
| 2048 | 2090320 | 2103128 | 0.00612729151517471 |
| 4096 | 1043084 | 1053744 | 0.0102196946746379 |
| 5555 | 768004 | 778560 | 0.013744720079582919 |
| 7143 | 596812 | 606124 | 0.015602903426874795 |
| 16384 | 259052 | 265432 | 0.02462825996325062 |

{code}
for (int channels : new int[]{16, 37, 64, 117, 256, 731, 1024}) {
        int[] counts = new int[channels];

        int i = Integer.MIN_VALUE;
        do {
                counts[murmurHash(i) % channels]++;
        } while (i++ < Integer.MAX_VALUE);

        Arrays.sort(counts);
        System.out.println(channels + ": " + counts[0] + ", " + counts[channels 
- 1] + ", " + ((1.0 * counts[channels - 1] - counts[0]) / counts[0]));
}
{code}

> Correction of murmur hash breaks backwards compatibility
> --------------------------------------------------------
>
>                 Key: FLINK-4154
>                 URL: https://issues.apache.org/jira/browse/FLINK-4154
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Greg Hogan
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> The correction of Flink's murmur hash with commit [1], breaks Flink's 
> backwards compatibility with respect to savepoints. The reason is that the 
> changed murmur hash which is used to partition elements in a {{KeyedStream}} 
> changes the mapping from keys to sub tasks. This changes the assigned key 
> spaces for a sub task. Consequently, an old savepoint (version 1.0) assigns 
> states with a different key space to the sub tasks.
> I think that this must be fixed for the upcoming 1.1 release. I see two 
> options to solve the problem:
> -  revert the changes, but then we don't know how the flawed murmur hash 
> performs
> - develop tooling to repartition state of old savepoints. This is probably 
> not trivial since a keyed stream can also contain non-partitioned state which 
> is not partitionable in all cases. And even if only partitioned state is 
> used, we would need some kind of special operator which can repartition the 
> state wrt the key.
> I think that the latter option requires some more thoughts and is thus 
> unlikely to be done before the release 1.1. Therefore, as a workaround, I 
> think that we should revert the murmur hash changes.
> [1] 
> https://github.com/apache/flink/commit/641a0d436c9b7a34ff33ceb370cf29962cac4dee



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to