Martin Junghanns created FLINK-2590: ---------------------------------------
Summary: DataSetUtils.zipWithUniqueID creates duplicate IDs Key: FLINK-2590 URL: https://issues.apache.org/jira/browse/FLINK-2590 Project: Flink Issue Type: Bug Components: Java API, Scala API Affects Versions: master Reporter: Martin Junghanns Assignee: Martin Junghanns Priority: Minor The function creates IDs using the following code: {{ shifter = log2(numberOfParallelSubtasks) id = counter << shifter + taskId; }} As the binary function (+) is executed before the bitshift (<<), this results in cases where different tasks create the same ID. It essentially calculates {{ counter*2^(shifter+taskId) }} which is 0 for counter = 0 and all values of shifter and taskID. Consider the following example. numberOfParallelSubtaks = 8 shifter = log2(8) = 4 (maybe rename the function?) produces: {{ start: 1, shifter: 4 taskId: 4 label: 256 start: 2, shifter: 4 taskId: 3 label: 256 start: 4, shifter: 4 taskId: 2 label: 256 }} I would suggest the following: {{ counter*2^(shifter)+taskId }} which in code is equivalent to {{ shifter = log2(numberOfParallelSubtasks); id = (counter << shifter) + taskId; }} and for our example produces: {{ start: 1, shifter: 4 taskId: 4 label: 20 start: 2, shifter: 4 taskId: 3 label: 35 start: 4, shifter: 4 taskId: 2 label: 66 }} So we move the counter to the left and add the task id. As there is space for 2^shifter numbers, this prevents collisions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)