becketqin commented on issue #10487: [FLINK-15100][connector/source] Add a base implementation for SourceReader. URL: https://github.com/apache/flink/pull/10487#issuecomment-604769006 @StephanEwen Thanks a lot for the thorough review. Please see the replies to your questions below: 1. Should boundedness be in the `SourceReaderOptions`? You are right. It should not. It was left over from earlier patch and I forgot to remove it when updating the patch. 2. Should `SourceReader` extend `Serializable`? No, It should not. With the Source as a factory they will be created on the fly instead of transmitted over the wire. And thanks for the suggestion of the IDE setting :) 3. Should there be a configurable Queue size for `RecordsWithSplitIds`? I agree this is a compromise. It is hard for users get such number based capacity right. Also, the number based capacity does not have a strict upper bound in memory footprint. However, I am not sure if what we do in Kafka connector can be put out of the box. A few concerns are: - Having a queue size of one seems would put the batching work to the SplitReader, which could be tricky to make right. They are less a problem for Kafka because the consumer kind of does the batch already, and users just sets `max.poll.records`. But for other connectors, there are something to be considered. For example, what should be the batch size? Would that batch size consideration become a similar discussion we have for the queue capacity, i.e. trade-off between performance and memory footprint? Should the SplitReader always wait for the batch to become full before putting it into the queue? Or should there also be a timeout to enqueue the batch? And how does the `SplitReader` decide the batch size? - Kafka only has a single fetcher, hence at most there will be two threads synchronizing on the queue, which is not too bad. But with our base reader implementation, there could be multiple fetchers, so the lock contention could be severe. - An Ideal solution here might be a bytes based capacity config. However, that requires the SplitReader to estimate the size of the records, which could also be hard in many cases. Due to the above reasons, I just chose the simplest way at this point. It is not perfect, but gives user a chance to tune. And If we set the default value of the queue to 1, we pretty much get a `HandShake` in the current Kafka connector. I do have some idea to improve the performance of this part with some lock-free tricks and composite queue. Maybe we can have such improvement in a follow-up patch. But if we cannot find a satisfactory solution, the current approach still seems a reasonable compromise. 4. Regarding package of the Mock classes. The reason they are in flink-core was because the coordinator tests also users them. And we probably don't want flink-runtime to depend on flink-connectors. Therefore the mocks are left in the flink-core.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
