Hi Hongshun,
>
>
> However, it will be tricky because SplitFetcherManager includes <E, SplitT
> extends SourceSplit>, while FutureCompletingBlockingQueue includes <T>.
> This means that SplitFetcherManager would have to be modified to <T, E,
> SplitT extends SourceSplit>, which would affect the compatibility of the
> SplitFetcherManager class. I'm afraid this change will influence other
> sources.

Although the FutureCompletingBlockingQueue class itself has a template
class <T>. In the SourceReaderBase and SplitFetcherManager, this <T> is
actually RecordsWithSplitIds<E>. So it looks like we can just let
SplitFetcherManager.poll() return a RecordsWithSplitIds<E>.

Thanks,

Jiangjie (Becket) Qin

On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi Becket,
>       I agree with you and try to modify this Flip[1], which include these
> changes:
>
>    1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
>    @Depricated
>    2.
>
>    Mark constructor of SourceReaderBase as *@Depricated* and provide a new
>    constructor without
>
>    FutureCompletingBlockingQueue
>    3.
>
>    Mark constructor of SplitFetcherManager andSingleThreadFetcherManager
>    as  *@Depricated* and provide a new constructor
>    without FutureCompletingBlockingQueue. Mark SplitFetcherManager
>    andSingleThreadFetcherManager as *@PublicEvolving*
>    4.
>
>    SplitFetcherManager provides  wrapper methods for
>    FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.
>    Then we can use FutureCompletingBlockingQueue only in
>    SplitFetcherManager.
>
> However, it will be tricky because SplitFetcherManager includes <E, SplitT
> extends SourceSplit>, while FutureCompletingBlockingQueue includes <T>.
> This means that SplitFetcherManager would have to be modified to <T, E,
> SplitT extends SourceSplit>, which would affect the compatibility of the
> SplitFetcherManager class. I'm afraid this change will influence other
> sources.
>
>
>
> Looking forward to hearing from you.
>
> Best regards,
> Hongshun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
>
> On Sat, Nov 11, 2023 at 10:55 AM Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Hongshun and Martijn,
> >
> > Sorry for the late reply as I was on travel and still catching up with
> the
> > emails. Please allow me to provide more context.
> >
> > 1. The original design of SplitFetcherManager and its subclasses was to
> > make them public to the Source developers. The goal is to let us take
> care
> > of the threading model, while the Source developers can just focus on the
> > SplitReader implementation. Therefore, I think making
> SplitFetcherManater /
> > SingleThreadFetcherManager public aligns with the original design. That
> is
> > also why these classes are exposed in the constructor of
> SourceReaderBase.
> >
> > 2. For FutureCompletingBlockingQueue, as a hindsight, it might be better
> to
> > not expose it to the Source developers. They are unlikely to use it
> > anywhere other than just constructing it. The reason that
> > FutureCompletingBlockingQueue is currently exposed in the
> SourceReaderBase
> > constructor is because both the SplitFetcherManager and SourceReaderBase
> > need it. One way to hide the FutureCompletingBlockingQueue from the
> public
> > API is to make SplitFetcherManager the only owner class of the queue, and
> > expose some of its methods via SplitFetcherManager. This way, the
> > SourceReaderBase can invoke the methods via SplitFetcherManager. I
> believe
> > this also makes the code slightly cleaner.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang <loserwang1...@gmail.com>
> > wrote:
> >
> > > @Martijn, I agree with you.
> > >
> > >
> > > I also have two questions at the beginning:
> > >
> > >    - Why is an Internal class
> > >    exposed as a constructor param of a Public class?
> > >    - Should these classes be exposed as public?
> > >
> > > For the first question,  I noticed that before the original Jira[1] ,
> > > all these classes missed the annotate , so it was not abnormal that
> > > FutureCompletingBlockingQueue and SingleThreadFetcherManager were
> > > constructor params of SingleThreadMultiplexSourceReaderBase.
> > >  However,
> > > this jira marked FutureCompletingBlockingQueue and
> > > SingleThreadFetcherManager as Internal, while marked
> > > SingleThreadMultiplexSourceReaderBase as Public. It's a good choice,
> > > but also forget that FutureCompletingBlockingQueue and
> > > SingleThreadFetcherManager have already been  exposed by
> > > SingleThreadMultiplexSourceReaderBase.
> > >  Thus, this problem occurs because we didn't
> > > clearly define the boundaries at the origin design. We should pay more
> > > attention to it when creating a new class.
> > >
> > >
> > > For the second question, I think at least SplitFetcherManager
> > > should be Public. There are few reasons:
> > >
> > >    -  Connector developers want to decide their own
> > >    thread mode. For example, Whether to recycle fetchers by overriding
> > >    SplitFetcherManager#maybeShutdownFinishedFetchers
> > >    when idle. Sometimes, developers want SplitFetcherManager react as a
> > >    FixedThreadPool, because
> > >    each time a thread is recycled then recreated, the context
> > > resources need to be rebuilt. I met a related issue in flink cdc[2].
> > >    -
> > >    KafkaSourceFetcherManager[3] also  extends
> > > SingleThreadFetcherManager to commitOffsets. But now kafka souce is
> > > not in Flink repository, so it's not allowed any more.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-22358
> > >
> > > [2]
> > >
> > >
> >
> https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418
> > >
> > > [3]
> > >
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52
> > >
> > > Looking forward to hearing from you.
> > >
> > > Best regards,
> > > Hongshun
> > >
> > > On Thu, Nov 9, 2023 at 11:46 PM Martijn Visser <
> martijnvis...@apache.org
> > >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'm looking at the original Jira that introduced these stability
> > > > designations [1] and I'm just curious if it was intended that these
> > > > Internal classes would be used directly, or if we just haven't
> created
> > > > the right abstractions? The reason for asking is because moving
> > > > something from Internal to a public designation is an easy fix, but I
> > > > want to make sure that it's also the right fix. If we are missing
> good
> > > > abstractions, then I would rather invest in those.
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-22358
> > > >
> > > > On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu <xbjt...@gmail.com>
> wrote:
> > > > >
> > > > > Thanks Hongshun for starting this discussion.
> > > > >
> > > > > +1 from my side.
> > > > >
> > > > > IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324
> > comment[1].
> > > > >
> > > > > Best,
> > > > > Leonard
> > > > >
> > > > > [1]
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756
> > > > >
> > > > >
> > > > >
> > > > > > 2023年11月8日 下午5:42,Hongshun Wang <loserwang1...@gmail.com> 写道:
> > > > > >
> > > > > > Hi devs,
> > > > > >
> > > > > > I would like to start a discussion on FLIP-389: Annotate
> > > > > > SingleThreadFetcherManager and FutureCompletingBlockingQueue as
> > > > > > PublicEvolving.[
> > > > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> > > > >
> > > > > > 1].
> > > > > >
> > > > > > Though the SingleThreadFetcherManager is annotated as Internal,
> it
> > > > actually
> > > > > > acts as some-degree public API, which is widely used in many
> > > connector
> > > > > > projects: flink-cdc-connector
> > > > > > <
> > > >
> > >
> >
> https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93
> > > > >
> > > > > > , flink-connector-mongodb
> > > > > > <
> > > >
> > >
> >
> https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58
> > > > >
> > > > > > and
> > > > > > soon.
> > > > > >
> > > > > > Moreover, even the constructor of
> > > SingleThreadMultiplexSourceReaderBase
> > > > > > (which is PublicEvolving) includes the params of
> > > > SingleThreadFetcherManager
> > > > > > and FutureCompletingBlockingQueue.  That means that the
> > > > > > SingleThreadFetcherManager  and FutureCompletingBlockingQueue
> have
> > > > already
> > > > > > been exposed to users for a long time and are widely used.
> > > > > >
> > > > > > Considering that all source implementations are using them de
> > facto,
> > > > why
> > > > > > not annotate SingleThreadFetcherManager and
> > > > FutureCompletingBlockingQueue
> > > > > > as PublicEvolving so that developers will modify it more
> carefully
> > to
> > > > avoid
> > > > > > any potential issues.  As shown in FLINK-31324[2], FLINK-28853[3]
> > > used
> > > > > > to change the default constructor of SingleThreadFetcherManager.
> > > > However,
> > > > > > it influenced a lot. Finally, the former constructor was added
> back
> > > and
> > > > > > marked as Deprecated。
> > > > > >
> > > > > > In conclusion, the goal of this FLIP is to annotate
> > > > > > SingleThreadFetcherManager(includes its parent class) and
> > > > > > FutureCompletingBlockingQueue as PublicEvolving.
> > > > > >
> > > > > > Looking forward to hearing from you.
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> > > > > >
> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-31324
> > > > > >
> > > > > > [3] https://issues.apache.org/jira/browse/FLINK-28853
> > > > >
> > > >
> > >
> >
>

Reply via email to