Hi, I was recently looking into a kafka connector issue (FLINK-4023 /
FLINK-4069), when it was pointed out that partition assignment will not be
deterministic if the partition discovery is imply moved to the open()
method. In the assignPartitions of FlinkKafkaConsumerBase a modulo on the
__index__ of the total list of topic-partitions subscribed to is used. It
is clear that calling it from the open() method in each task can produce
different lists and so some partitions can be consumed multiple times while
others not consumed at all. In a related discussion it was suggested to
build this list in a deterministic way so that each partitions sees the
same index for the same topic-partition. This would work for the issues
above, but it highlighted for me another issue which relates to partition
assignment itself - hence starting a different thread.

It don't understand at this point the way how Flink does co-group on
multiple topics but having worked in the Kafka zone for a number of years,
ignoring the physical partition id which is deterministic at the Kafka
cluster level, and using a transient list (even if it is constructed
deterministically) means that co-partitioning cannot be exploited for a
straight co-group and Flink has to always do its own shuffle. I think using
getPartition() on each topic-partition instead of list index in the
assignPartition is necessary, even if it may result in an unbalanced work
distribution among Flink consumer instances. But it seems to me that in
Flink, the partitioning schemes that exist outside its runtime are ignored.
Is it because any source outside Flink's realm is treated as to be imported
and no partitioning is assumed for simplicity/control or is it because this
is expected to produce even load-balancing of work? What else am I missing?

Michal

Reply via email to