Github user thvasilo commented on the issue:
https://github.com/apache/flink/pull/2740
Hello @tfournier314 I tested your code and it does seem that partitions are
sorted
only internally, which is expected and `zipWithIndex` is AFAIK unaware of
the sorted (as in key range) order of partitions, so it's not guaranteed that
the "first" partition will get the `[0, partitionSize-1]` indices, the second
`[partitionSize, 2*partitionSize]` etc. Maybe @greghogan knows a solution for
global sorting?
If it's not possible I think we can take a step back and see what we are
trying to achieve here.
The task is to count the frequencies of labels and assign integer ids to
them in order of frequency. The labels should either be categorical variables
(e.g. {Male, Female, Uknown}) or class labels. The case with the most unique
values might be vocabulary words, which will range in the few million unique
values at worst.
I would argue then than after we have performed the frequency count in a
distributed manner there is no need to do the last step which is assigning
ordered indices in a distributed manner as well, we can make the assumption
that all the (label -> frequency) values should fit into the memory of one
machine.
So I would recommend to gather all data into one partition after getting
the counts, that way we guarantee a global ordering:
```{Scala}
fitData.map(s => (s,1))
.groupBy(0)
.sum(1)
.partitionByRange(x => 0)
.sortPartition(1, Order.DESCENDING)
.zipWithIndex
.print()
```
Of course we would need to clarify this restriction in the docstrings and
documentation.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---