> hybrid sounds to me more like the source would constantly switch back and
forth

Initially, the focus of hybrid source is more like a sequenced chain.

But in the future it would be cool that hybrid sources can intelligently
switch back and forth between historical data source (like Iceberg) and
live data source (like Kafka). E.g.,
- if the Flink job is lagging behind Kafka retention, automatically switch
to Iceberg source
- once job caught up, switch back to Kafka source

That can simplify operational aspects of manually switching.


On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <ar...@apache.org> wrote:

> Sorry for joining the party so late, but it's such an interesting FLIP with
> a huge impact that I wanted to add my 2 cents. [1]
> I'm mirroring some basic question from the PR review to this thread because
> it's about the name:
>
> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> similar.
> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
> does not carry the concatentation concept as well (hybrid sounds to me more
> like the source would constantly switch back and forth).
>
> Could we take a few minutes to think if this is the most intuitive name for
> new users? I'm especially hoping that natives might give some ideas (or
> declare that Hybrid is perfect).
>
> [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
>
> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <stevenz...@gmail.com> wrote:
>
> > > Converter function relies on the specific enumerator capabilities to
> set
> > the new start position (e.g.
> > fileSourceEnumerator.getEndTimestamp() and
> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >
> > I guess the premise is that a converter is for a specific tuple of
> > (upstream source, downstream source) . We don't have to define generic
> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> >
> > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
> > probably promoting uniformity across sources that support
> hybrid/switchable
> > source.
> >
> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <t...@apache.org> wrote:
> >
> > > Hi Steven,
> > >
> > > Thank you for the thorough review of the PR and for bringing this back
> > > to the mailing list.
> > >
> > > All,
> > >
> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > > deviates from the original proposal [1]. The goal would be to update
> > > the FLIP soon and bring it to a vote, as previously suggested offline
> > > by Nicholas.
> > >
> > > A few minor issues in the PR are outstanding and I'm working on test
> > > coverage for the recovery behavior, which should be completed soon.
> > >
> > > The dynamic position transfer needs to be concluded before we can move
> > > forward however.
> > >
> > > There have been various ideas, including the special
> > > "SwitchableEnumerator" interface, using enumerator checkpoint state or
> > > an enumerator interface extension to extract the end state.
> > >
> > > One goal in the FLIP is to "Reuse the existing Source connectors built
> > > with FLIP-27 without any change." and I think it is important to honor
> > > that goal given that fixed start positions do not require interface
> > > changes.
> > >
> > > Based on the feedback the following might be a good solution for
> > > runtime position transfer:
> > >
> > > * User supplies the optional converter function (not applicable for
> > > fixed positions).
> > > * Instead of relying on the enumerator checkpoint state [2], the
> > > converter function will be supplied with the current and next
> > > enumerator (source.createEnumerator).
> > > * Converter function relies on the specific enumerator capabilities to
> > > set the new start position (e.g.
> > > fileSourceEnumerator.getEndTimestamp() and
> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > >
> > > With this approach, there is no need to augment FLIP-27 interfaces and
> > > custom source capabilities are easier to integrate. Removing the
> > > mandate to rely on enumerator checkpoint state also avoids potential
> > > upgrade/compatibility issues.
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > > [2]
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> > >
> > >
> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <stevenz...@gmail.com> wrote:
> > > >
> > > > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > > > missed anything.
> > > >
> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > converter.
> > > > * Current PR uses the enumerator checkpoint state type as the input
> for
> > > the
> > > > converter
> > > > * FLIP-150 defines a new EndStateT interface.
> > > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > > > transition EndStateT doesn't have to be included in the upstream
> source
> > > > checkpoint state.
> > > >
> > > > Let's look at two use cases:
> > > > 1) static cutover time at 5 pm. File source reads all data btw 9 am
> - 5
> > > pm,
> > > > then Kafka source starts with initial position of 5 pm. In this case,
> > > there
> > > > is no need for converter or EndStateT since the starting time for
> Kafka
> > > > source is known and fixed.
> > > > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > > > bootstrap of historic data takes a long time (like days or weeks) and
> > we
> > > > don't know the exact time of cutover when a job is launched. Instead,
> > we
> > > > are instructing the file source to stop when it gets close to live
> > data.
> > > In
> > > > this case, hybrid source construction will specify a relative time
> (now
> > > - 1
> > > > hour), the EndStateT (of file source) will be resolved to an absolute
> > > time
> > > > for cutover. We probably don't need to include EndStateT (end
> > timestamp)
> > > as
> > > > the file source checkpoint state. Hence, the separate EndStateT is
> > > probably
> > > > more desirable.
> > > >
> > > > We also discussed the converter for the Kafka source. Kafka source
> > > supports
> > > > different OffsetsInitializer impls (including
> > > TimestampOffsetsInitializer).
> > > > To support the dynamic cutover time (use case #2 above), we can plug
> > in a
> > > > SupplierTimestampOffsetInitializer, where the starting timestamp is
> not
> > > set
> > > > during source/job construction. Rather it is a supplier model where
> the
> > > > starting timestamp value is set to the resolved absolute timestamp
> > during
> > > > switch.
> > > >
> > > > Thanks,
> > > > Steven
> > > >
> > > >
> > > >
> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <t...@apache.org> wrote:
> > > >
> > > > > Hi Nicholas,
> > > > >
> > > > > Thanks for taking a look at the PR!
> > > > >
> > > > > 1. Regarding switching mechanism:
> > > > >
> > > > > There has been previous discussion in this thread regarding the
> pros
> > > > > and cons of how the switching can be exposed to the user.
> > > > >
> > > > > With fixed start positions, no special switching interface to
> > transfer
> > > > > information between enumerators is required. Sources are configured
> > as
> > > > > they would be when used standalone and just plugged into
> > HybridSource.
> > > > > I expect that to be a common use case. You can find an example for
> > > > > this in the ITCase:
> > > > >
> > > > >
> > > > >
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > > > >
> > > > > For dynamic start position, the checkpoint state is used to
> transfer
> > > > > information from old to new enumerator. An example for that can be
> > > > > found here:
> > > > >
> > > > >
> > > > >
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > > > >
> > > > > That may look verbose, but the code to convert from one state to
> > > > > another can be factored out into a utility and the function
> becomes a
> > > > > one-liner.
> > > > >
> > > > > For common sources like files and Kafka we can potentially (later)
> > > > > implement the conversion logic as part of the respective
> connector's
> > > > > checkpoint and split classes.
> > > > >
> > > > > I hope that with the PR up for review, we can soon reach a
> conclusion
> > > > > on how we want to expose this to the user.
> > > > >
> > > > > Following is an example for Files -> Files -> Kafka that I'm using
> > for
> > > > > e2e testing. It exercises both ways of setting the start position.
> > > > >
> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > > > >
> > > > >
> > > > > 2. Regarding the events used to implement the actual switch between
> > > > > enumerator and readers: I updated the PR with javadoc to clarify
> the
> > > > > intent. Please let me know if that helps or let's continue to
> discuss
> > > > > those details on the PR?
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Thomas
> > > > >
> > > > >
> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <
> programg...@163.com>
> > > > > wrote:
> > > > > >
> > > > > > Hi Thomas,
> > > > > >
> > > > > >    Sorry for later reply for your POC. I have reviewed the based
> > > abstract
> > > > > > implementation of your pull request:
> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the
> switching
> > > > > > mechanism, this level of abstraction is not concise enough, which
> > > doesn't
> > > > > > make connector contribution easier. In theory, it is necessary to
> > > > > introduce
> > > > > > a set of interfaces to support the switching mechanism. The
> > > > > SwitchableSource
> > > > > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > > > > expansibility.
> > > > > >    In other words, the whole switching process of above mentioned
> > PR
> > > is
> > > > > > different from that mentioned in FLIP-150. In the above
> > > implementation,
> > > > > the
> > > > > > source reading switching is executed after receving the
> > > > > SwitchSourceEvent,
> > > > > > which could be before the sending SourceReaderFinishEvent. This
> > > timeline
> > > > > of
> > > > > > source reading switching could be discussed here.
> > > > > >    @Stephan @Becket, if you are available, please help to review
> > the
> > > > > > abstract implementation, and compare with the interfaces
> mentioned
> > in
> > > > > > FLIP-150.
> > > > > >
> > > > > > Thanks,
> > > > > > Nicholas Jiang
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Sent from:
> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > >
> > >
> >
>

Reply via email to