[
https://issues.apache.org/jira/browse/FLINK-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367950#comment-15367950
]
Greg Hogan commented on FLINK-4154:
-----------------------------------
I wanted to see how much skew this causes in case we should consider fixing the
algorithm for 2.0. With the proper MurmurHash I see no skew. In the Flink
version the skew is typically much less than 1% when parallelism is small.
|| channels || min count || max count || relative skew ||
| 16 | 268391180 | 268480808 | 3.3394540014317905E-4 |
| 37 | 116042832 | 116126424 | 7.20354704890346E-4 |
| 64 | 67091700 | 67129376 | 5.615597756503413E-4 |
| 117 | 36686636 | 36742216 | 0.0015149930890365636 |
| 256 | 16767212 | 16786620 | 0.0011574971438304711 |
| 731 | 5857272 | 5890152 | 0.005613534764989572 |
| 1024 | 4186452 | 4202972 | 0.003946062202552424 |
| 1497 | 2860692 | 2878580 | 0.006253032483049556 |
| 2048 | 2090320 | 2103128 | 0.00612729151517471 |
| 4096 | 1043084 | 1053744 | 0.0102196946746379 |
| 5555 | 768004 | 778560 | 0.013744720079582919 |
| 7143 | 596812 | 606124 | 0.015602903426874795 |
| 16384 | 259052 | 265432 | 0.02462825996325062 |
{code}
for (int channels : new int[]{16, 37, 64, 117, 256, 731, 1024}) {
int[] counts = new int[channels];
int i = Integer.MIN_VALUE;
do {
counts[murmurHash(i) % channels]++;
} while (i++ < Integer.MAX_VALUE);
Arrays.sort(counts);
System.out.println(channels + ": " + counts[0] + ", " + counts[channels
- 1] + ", " + ((1.0 * counts[channels - 1] - counts[0]) / counts[0]));
}
{code}
> Correction of murmur hash breaks backwards compatibility
> --------------------------------------------------------
>
> Key: FLINK-4154
> URL: https://issues.apache.org/jira/browse/FLINK-4154
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: Greg Hogan
> Priority: Blocker
> Fix For: 1.1.0
>
>
> The correction of Flink's murmur hash with commit [1], breaks Flink's
> backwards compatibility with respect to savepoints. The reason is that the
> changed murmur hash which is used to partition elements in a {{KeyedStream}}
> changes the mapping from keys to sub tasks. This changes the assigned key
> spaces for a sub task. Consequently, an old savepoint (version 1.0) assigns
> states with a different key space to the sub tasks.
> I think that this must be fixed for the upcoming 1.1 release. I see two
> options to solve the problem:
> - revert the changes, but then we don't know how the flawed murmur hash
> performs
> - develop tooling to repartition state of old savepoints. This is probably
> not trivial since a keyed stream can also contain non-partitioned state which
> is not partitionable in all cases. And even if only partitioned state is
> used, we would need some kind of special operator which can repartition the
> state wrt the key.
> I think that the latter option requires some more thoughts and is thus
> unlikely to be done before the release 1.1. Therefore, as a workaround, I
> think that we should revert the murmur hash changes.
> [1]
> https://github.com/apache/flink/commit/641a0d436c9b7a34ff33ceb370cf29962cac4dee
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)