PatrickRen commented on pull request #15397:
URL: https://github.com/apache/flink/pull/15397#issuecomment-810081612


   Thanks for your reviewing and discussions! Here's some thought of mine about 
the above discussions:
   
   ## Removing Kafka Assignment Tracking
   Currently the reason we do double-bookkeeping in Kafka enumerator 
(```discoveredPartitions``` v.s. ```pendingAssignments``` + 
```assignedSplits```) is for isolation between worker thread and coordinator 
thread. 
   
   - ```discoveredPartitions``` is only accessed by **worker thread** for 
detecting partition and deduplication, while ```pendingAssignments``` + 
```assignedSplits``` are only accessed by **coordinator thread** for split 
distributing. 
   - If we remove the ```discoveredPartitions``` and let worker thread infer it 
from ```pendingAssignments``` + ```assignedSplits```, potentially race 
condition will be introduced between worker and coordinator thread, since they 
are sharing the same data structure. 
   
   A possible solution in my mind to avoid double-bookkeeping is that: 
   - Worker thread only do partition discovery but not deduplication, then 
handover ALL discovered partitions to coordinator thread, so the worker thread 
is stateless and we can remove ```discoveredPartitions```.
   -  Coordinator thread takes all partitions, deduplicates against 
```pendingAssignments``` + ```assignedSplits```, and makes split assignment. 
   
   About the type stored in enumerator state,  I agree with @StephanEwen's 
suggestion that using ```TopicPartition``` instead of ```KafkaPartitionSplit``` 
in the state. I'll update my code later. 
   
   ## Removing Split Assignment Tracker from State
   I agree with @StephanEwen's suggestion. This can make the code much cleaner. 
Further more, we can also make these changes:
   - Use ```DataOutputSerializer``` in 
```SourceCoordinator.writeCheckpointBytes()``` to be symmetric with 
```DataInputDeserializer```
   - Rename ```SourceCoordinatorContext.snapshotState()``` to 
```SourceCoordinatorContext.onCheckpoint()```, since context has nothing to 
store in the state
   - Remove ```SourceCoordinatorContext.restoreState()```
   
   Again thanks for your meticulous review! I'll update my code later.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to