PatrickRen commented on pull request #15397: URL: https://github.com/apache/flink/pull/15397#issuecomment-810081612
Thanks for your reviewing and discussions! Here's some thought of mine about the above discussions: ## Removing Kafka Assignment Tracking Currently the reason we do double-bookkeeping in Kafka enumerator (```discoveredPartitions``` v.s. ```pendingAssignments``` + ```assignedSplits```) is for isolation between worker thread and coordinator thread. - ```discoveredPartitions``` is only accessed by **worker thread** for detecting partition and deduplication, while ```pendingAssignments``` + ```assignedSplits``` are only accessed by **coordinator thread** for split distributing. - If we remove the ```discoveredPartitions``` and let worker thread infer it from ```pendingAssignments``` + ```assignedSplits```, potentially race condition will be introduced between worker and coordinator thread, since they are sharing the same data structure. A possible solution in my mind to avoid double-bookkeeping is that: - Worker thread only do partition discovery but not deduplication, then handover ALL discovered partitions to coordinator thread, so the worker thread is stateless and we can remove ```discoveredPartitions```. - Coordinator thread takes all partitions, deduplicates against ```pendingAssignments``` + ```assignedSplits```, and makes split assignment. About the type stored in enumerator state, I agree with @StephanEwen's suggestion that using ```TopicPartition``` instead of ```KafkaPartitionSplit``` in the state. I'll update my code later. ## Removing Split Assignment Tracker from State I agree with @StephanEwen's suggestion. This can make the code much cleaner. Further more, we can also make these changes: - Use ```DataOutputSerializer``` in ```SourceCoordinator.writeCheckpointBytes()``` to be symmetric with ```DataInputDeserializer``` - Rename ```SourceCoordinatorContext.snapshotState()``` to ```SourceCoordinatorContext.onCheckpoint()```, since context has nothing to store in the state - Remove ```SourceCoordinatorContext.restoreState()``` Again thanks for your meticulous review! I'll update my code later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
