becketqin commented on issue #10487: [FLINK-15100][connector/source] Add a base 
implementation for SourceReader.
URL: https://github.com/apache/flink/pull/10487#issuecomment-604769006
 
 
   @StephanEwen Thanks a lot for the thorough review. Please see the replies to 
your questions below:
   
   1. Should boundedness be in the `SourceReaderOptions`?
   You are right. It should not. It was left over from earlier patch and I 
forgot to remove it when updating the patch.
   
   2. Should `SourceReader` extend `Serializable`?
   No, It should not. With the Source as a factory they will be created on the 
fly instead of transmitted over the wire. And thanks for the suggestion of the 
IDE setting :)
   
   3. Should there be a configurable Queue size for `RecordsWithSplitIds`?
   I agree this is a compromise. It is hard for users get such number based 
capacity right. Also, the number based capacity does not have a strict upper 
bound in memory footprint. 
   
       However, I am not sure if what we do in Kafka connector can be put out 
of the box. A few concerns are:
   
       - Having a queue size of one seems would put the batching work to the 
SplitReader, which could be tricky to make right. They are less a problem for 
Kafka because the consumer kind of does the batch already, and users just sets 
`max.poll.records`. But for other connectors, there are something to be 
considered. For example, what should be the batch size? Would that batch size 
consideration become a similar discussion we have for the queue capacity, i.e. 
trade-off between performance and memory footprint? Should the SplitReader 
always wait for the batch to become full before putting it into the queue? Or 
should there also be a timeout to enqueue the batch? And how does the 
`SplitReader` decide the batch size? 
       - Kafka only has a single fetcher, hence at most there will be two 
threads synchronizing on the queue, which is not too bad. But with our base 
reader implementation, there could be multiple fetchers, so the lock contention 
could be severe.
       - An Ideal solution here might be a bytes based capacity config. 
However, that requires the SplitReader to estimate the size of the records, 
which could also be hard in many cases.
   
       Due to the above reasons, I just chose the simplest way at this point. 
It is not perfect, but gives user a chance to tune. And If we set the default 
value of the queue to 1, we pretty much get a `HandShake` in the current Kafka 
connector. 
   
       I do have some idea to improve the performance of this part with some 
lock-free tricks and composite queue. Maybe we can have such improvement in a 
follow-up patch. But if we cannot find a satisfactory solution, the current 
approach still seems a reasonable compromise.
   
   4. Regarding package of the Mock classes.
   The reason they are in flink-core was because the coordinator tests also 
users them. And we probably don't want flink-runtime to depend on 
flink-connectors. Therefore the mocks are left in the flink-core.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to