Hi Hongshun, > > > However, it will be tricky because SplitFetcherManager includes <E, SplitT > extends SourceSplit>, while FutureCompletingBlockingQueue includes <T>. > This means that SplitFetcherManager would have to be modified to <T, E, > SplitT extends SourceSplit>, which would affect the compatibility of the > SplitFetcherManager class. I'm afraid this change will influence other > sources.
Although the FutureCompletingBlockingQueue class itself has a template class <T>. In the SourceReaderBase and SplitFetcherManager, this <T> is actually RecordsWithSplitIds<E>. So it looks like we can just let SplitFetcherManager.poll() return a RecordsWithSplitIds<E>. Thanks, Jiangjie (Becket) Qin On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi Becket, > I agree with you and try to modify this Flip[1], which include these > changes: > > 1. Mark constructor of SingleThreadMultiplexSourceReaderBase as > @Depricated > 2. > > Mark constructor of SourceReaderBase as *@Depricated* and provide a new > constructor without > > FutureCompletingBlockingQueue > 3. > > Mark constructor of SplitFetcherManager andSingleThreadFetcherManager > as *@Depricated* and provide a new constructor > without FutureCompletingBlockingQueue. Mark SplitFetcherManager > andSingleThreadFetcherManager as *@PublicEvolving* > 4. > > SplitFetcherManager provides wrapper methods for > FutureCompletingBlockingQueue to replace its usage in SourceReaderBase. > Then we can use FutureCompletingBlockingQueue only in > SplitFetcherManager. > > However, it will be tricky because SplitFetcherManager includes <E, SplitT > extends SourceSplit>, while FutureCompletingBlockingQueue includes <T>. > This means that SplitFetcherManager would have to be modified to <T, E, > SplitT extends SourceSplit>, which would affect the compatibility of the > SplitFetcherManager class. I'm afraid this change will influence other > sources. > > > > Looking forward to hearing from you. > > Best regards, > Hongshun > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498 > > On Sat, Nov 11, 2023 at 10:55 AM Becket Qin <becket....@gmail.com> wrote: > > > Hi Hongshun and Martijn, > > > > Sorry for the late reply as I was on travel and still catching up with > the > > emails. Please allow me to provide more context. > > > > 1. The original design of SplitFetcherManager and its subclasses was to > > make them public to the Source developers. The goal is to let us take > care > > of the threading model, while the Source developers can just focus on the > > SplitReader implementation. Therefore, I think making > SplitFetcherManater / > > SingleThreadFetcherManager public aligns with the original design. That > is > > also why these classes are exposed in the constructor of > SourceReaderBase. > > > > 2. For FutureCompletingBlockingQueue, as a hindsight, it might be better > to > > not expose it to the Source developers. They are unlikely to use it > > anywhere other than just constructing it. The reason that > > FutureCompletingBlockingQueue is currently exposed in the > SourceReaderBase > > constructor is because both the SplitFetcherManager and SourceReaderBase > > need it. One way to hide the FutureCompletingBlockingQueue from the > public > > API is to make SplitFetcherManager the only owner class of the queue, and > > expose some of its methods via SplitFetcherManager. This way, the > > SourceReaderBase can invoke the methods via SplitFetcherManager. I > believe > > this also makes the code slightly cleaner. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang <loserwang1...@gmail.com> > > wrote: > > > > > @Martijn, I agree with you. > > > > > > > > > I also have two questions at the beginning: > > > > > > - Why is an Internal class > > > exposed as a constructor param of a Public class? > > > - Should these classes be exposed as public? > > > > > > For the first question, I noticed that before the original Jira[1] , > > > all these classes missed the annotate , so it was not abnormal that > > > FutureCompletingBlockingQueue and SingleThreadFetcherManager were > > > constructor params of SingleThreadMultiplexSourceReaderBase. > > > However, > > > this jira marked FutureCompletingBlockingQueue and > > > SingleThreadFetcherManager as Internal, while marked > > > SingleThreadMultiplexSourceReaderBase as Public. It's a good choice, > > > but also forget that FutureCompletingBlockingQueue and > > > SingleThreadFetcherManager have already been exposed by > > > SingleThreadMultiplexSourceReaderBase. > > > Thus, this problem occurs because we didn't > > > clearly define the boundaries at the origin design. We should pay more > > > attention to it when creating a new class. > > > > > > > > > For the second question, I think at least SplitFetcherManager > > > should be Public. There are few reasons: > > > > > > - Connector developers want to decide their own > > > thread mode. For example, Whether to recycle fetchers by overriding > > > SplitFetcherManager#maybeShutdownFinishedFetchers > > > when idle. Sometimes, developers want SplitFetcherManager react as a > > > FixedThreadPool, because > > > each time a thread is recycled then recreated, the context > > > resources need to be rebuilt. I met a related issue in flink cdc[2]. > > > - > > > KafkaSourceFetcherManager[3] also extends > > > SingleThreadFetcherManager to commitOffsets. But now kafka souce is > > > not in Flink repository, so it's not allowed any more. > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-22358 > > > > > > [2] > > > > > > > > > https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418 > > > > > > [3] > > > > > > > > > https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52 > > > > > > Looking forward to hearing from you. > > > > > > Best regards, > > > Hongshun > > > > > > On Thu, Nov 9, 2023 at 11:46 PM Martijn Visser < > martijnvis...@apache.org > > > > > > wrote: > > > > > > > Hi all, > > > > > > > > I'm looking at the original Jira that introduced these stability > > > > designations [1] and I'm just curious if it was intended that these > > > > Internal classes would be used directly, or if we just haven't > created > > > > the right abstractions? The reason for asking is because moving > > > > something from Internal to a public designation is an easy fix, but I > > > > want to make sure that it's also the right fix. If we are missing > good > > > > abstractions, then I would rather invest in those. > > > > > > > > Best regards, > > > > > > > > Martijn > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-22358 > > > > > > > > On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu <xbjt...@gmail.com> > wrote: > > > > > > > > > > Thanks Hongshun for starting this discussion. > > > > > > > > > > +1 from my side. > > > > > > > > > > IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324 > > comment[1]. > > > > > > > > > > Best, > > > > > Leonard > > > > > > > > > > [1] > > > > > > > > > > https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756 > > > > > > > > > > > > > > > > > > > > > 2023年11月8日 下午5:42,Hongshun Wang <loserwang1...@gmail.com> 写道: > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > I would like to start a discussion on FLIP-389: Annotate > > > > > > SingleThreadFetcherManager and FutureCompletingBlockingQueue as > > > > > > PublicEvolving.[ > > > > > > < > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498 > > > > > > > > > > > 1]. > > > > > > > > > > > > Though the SingleThreadFetcherManager is annotated as Internal, > it > > > > actually > > > > > > acts as some-degree public API, which is widely used in many > > > connector > > > > > > projects: flink-cdc-connector > > > > > > < > > > > > > > > > > https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93 > > > > > > > > > > > , flink-connector-mongodb > > > > > > < > > > > > > > > > > https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58 > > > > > > > > > > > and > > > > > > soon. > > > > > > > > > > > > Moreover, even the constructor of > > > SingleThreadMultiplexSourceReaderBase > > > > > > (which is PublicEvolving) includes the params of > > > > SingleThreadFetcherManager > > > > > > and FutureCompletingBlockingQueue. That means that the > > > > > > SingleThreadFetcherManager and FutureCompletingBlockingQueue > have > > > > already > > > > > > been exposed to users for a long time and are widely used. > > > > > > > > > > > > Considering that all source implementations are using them de > > facto, > > > > why > > > > > > not annotate SingleThreadFetcherManager and > > > > FutureCompletingBlockingQueue > > > > > > as PublicEvolving so that developers will modify it more > carefully > > to > > > > avoid > > > > > > any potential issues. As shown in FLINK-31324[2], FLINK-28853[3] > > > used > > > > > > to change the default constructor of SingleThreadFetcherManager. > > > > However, > > > > > > it influenced a lot. Finally, the former constructor was added > back > > > and > > > > > > marked as Deprecated。 > > > > > > > > > > > > In conclusion, the goal of this FLIP is to annotate > > > > > > SingleThreadFetcherManager(includes its parent class) and > > > > > > FutureCompletingBlockingQueue as PublicEvolving. > > > > > > > > > > > > Looking forward to hearing from you. > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498 > > > > > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-31324 > > > > > > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-28853 > > > > > > > > > > > > > > >