Github user EAlexRojas commented on a diff in the pull request:
https://github.com/apache/flink/pull/5991#discussion_r190529981
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
* <p>This method is exposed for testing purposes.
*/
@VisibleForTesting
- void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>>
newPartitions) throws Exception {
- if (newPartitions.size() == 0) {
+ void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>>
newPartitions, Set<TopicPartition> partitionsToBeRemoved) throws Exception {
--- End diff --
I though about it, but my only concern is the case where we'd have both,
partitions to add and partitions to remove...
the `consumerCallBridge.assignPartitions()` takes the whole new list of
partitions, so in that case, we would need to wait for the first assignment
(e.g. add new partitions) before doing the second assignment (e.g. remove
partitions) in order to have a consistent list of partitions.
I think we would try to have only one call to
`consumerCallBridge.assignPartitions()`.
Maybe I could refactor the part where partitions are removed from old
partitions to a separate private method like `removeFromOldPartitions()` ?
What do you think ?
---