Hi All,

I think using plugins, as Stephan suggested, would be the best way to serve
different requirements for difference scenarios, even after they are merged
into Flink core.

As far as I know, the pluggable shuffle service is ready for use.
Failover strategy does not support plugin yet, but it's in good shape and
would not need much effort to support it.
In our experience on implementing the "best-effort" recovery, these 2
plugins should be enough:
1. a shuffle service based on current basic implementation but supports
reconnectable input/output connection, and discarding overloaded records to
not cause back pressure
2. an individual failover strategy which only restarts the failed task

Besides, "at-lest-once" individual failover can also be supported in this
way, with a pluggable shuffle service which supports caching results [1]
and the the individual failover strategy. It can be helpful for scenarios
with higher data consistency demands.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

Thanks,
Zhu Zhu



Biao Liu <mmyy1...@gmail.com> 于2019年7月24日周三 上午10:41写道:

> Hi Stephan & Xiaogang,
>
> It's great to see this discussion active again!
>
> It makes sense to me that doing some private optimization and trial through
> plugin. I understand that the community could not satisfy every one and
> every requirement due to limited resources. The pluggable strategy is a
> good way to compromise. In that way, it might be also helpful for improving
> the pluggable strategy itself since there might be some reasonable
> requirements from the plugin.
>
> Regarding to the "at-most-once" or "best-effort" semantics, I think it
> worths going further since we heard these requirements several times.
> However I think we need more investigations of implementing based on
> pluggable shuffle service and scheduler (or some more components?). There
> might be a public discussion when we are ready. I hope it would happen
> soon.
>
>
> On Wed, Jul 24, 2019 at 9:43 AM SHI Xiaogang <shixiaoga...@gmail.com>
> wrote:
>
> > Hi Stephan,
> >
> > I agree with you that  the implementation of "at-most-once" or
> > "best-effort" recovery will benefit from pluggable shuffle service and
> > pluggable scheduler.  Actually we made some attempts in our private
> > repository and it turns out that it requires quite a lot of work to
> > implement this with exsiting network stack. We can start the work on this
> > when pluggable shuffle service and pluggable scheduler are ready.
> >
> > The suggestion of external implementation is a very good idea. That way,
> we
> > can implement both "at-most-once" and "best-effort" guarantees as
> different
> > checkpoint/failover strategies. If so, i think we should focus on the
> > components that are changed in different strategies. These components may
> > include a pluggable checkpoint barrier handler and a pluggable failover
> > strategy. We can list these components and discuss implementation details
> > then.
> >
> > What do you think, Biao Liu and Zhu Zhu?
> >
> > Regards,
> > Xiaogang
> >
> >
> > Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午1:31写道:
> >
> > > Hi all!
> > >
> > > This is an interesting discussion for sure.
> > >
> > > Concerning user requests for changes modes, I also hear the following
> > quite
> > > often:
> > >   - reduce the expensiveness of checkpoint alignment (unaligned
> > > checkpoints) to make checkpoints fast/stable under high backpressure
> > >   - more fine-grained failover while maintaining exactly-once (even if
> > > costly)
> > >
> > > Having also "at most once" to the mix is quite a long list of big
> changes
> > > to the system.
> > >
> > > My feeling is that on such a core system, the community can not push
> all
> > > these efforts at the same time, especially because they touch
> overlapping
> > > areas of the system and need the same committers involved.
> > >
> > > On the other hand, the pluggable shuffle service and pluggable
> scheduler
> > > could make it possible to have an external implementation of that.
> > >   - of a network stack that supports "reconnects" of failed tasks with
> > > continuing tasks
> > >   - a scheduling strategy that restarts tasks individually even in
> > > pipelined regions
> > >
> > > I think contributors/committers could implements this separate from the
> > > Flink core. The feature would be trial-run it through the community
> > > packages. If it gains a lot of traction, the community could decide to
> > put
> > > in the effort to merge this into the core.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Tue, Jun 11, 2019 at 2:10 PM SHI Xiaogang <shixiaoga...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > It definitely requires a massive effort to allow at-most-once
> delivery
> > in
> > > > Flink. But as the feature is urgently demanded by many Flink users, i
> > > think
> > > > every effort we made is worthy. Actually, the inability to support
> > > > at-most-once delivery has become a major obstacle for Storm users to
> > turn
> > > > to Flink. It's undesirable for us to run different stream processing
> > > > systems for different scenarios.
> > > >
> > > > I agree with Zhu Zhu that the guarantee we provide is the very first
> > > thing
> > > > to be discussed. Recovering with checkpoints will lead to duplicated
> > > > records, thus break the guarantee on at-most-once delivery.
> > > >
> > > > A method to achieve at-most-once guarantee is to completely disable
> > > > checkpointing and let sources only read those records posted after
> they
> > > > start. The method requires sources to allow the configuration to read
> > > > latest records, which luckily is supported by many message queues
> > > including
> > > > Kafka. As Flink relies sources' ability to rollback to achieve
> > exact-only
> > > > and at-least-once delivery, i think it's acceptable for Flink to rely
> > > > sources' ability to read latest records to achieve at-most once
> > delivery.
> > > > This method does not require any modification to existing
> checkpointing
> > > > mechanism. Besides, as there is no need to restoring from
> checkpoints,
> > > > failed tasks can recover themselves at the fastest speed.
> > > >
> > > > Concerning the implementation efforts, i think we can benefit from
> some
> > > > ongoing work including shuffle services and fine-grained recovery.
> For
> > > > example, currently the exceptions in network connections will lead to
> > > > failures of downstream and upstream tasks. To achieve at-most-once
> > > > delivery, we should decouple intermediate results from tasks,
> reporting
> > > the
> > > > exceptions of intermediate results to job master and letting the
> > failover
> > > > strategy to determine the actions taken. Some work is already done in
> > the
> > > > efforts to achieve fine-grained recovery, which can be extended to
> > allow
> > > > at-most-once delivery in Flink.
> > > >
> > > > But before starting the discussion on implementation details, as said
> > at
> > > > prior, we need to determine the guarantee we provide in the scenarios
> > > where
> > > > timely recovery is needed.
> > > > * What do you think of the at-most-once guarantee achieved by the
> > > proposed
> > > > method?
> > > > * Do we need checkpointing to reduce the amount of lost data?
> > > > * Do we need deduplication to guarantee at-most-once delivery or just
> > > > provide best-effort delivery?
> > > >
> > > > Regards,
> > > > Xiaogang Shi
> > > >
> > > >
> > > > Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:
> > > >
> > > > > Hi Xiaogang,
> > > > >
> > > > > It sounds interesting and definitely a useful feature, however the
> > > > > questions for me would be how useful, how much effort would it
> > require
> > > > and
> > > > > is it worth it? We simply can not do all things at once, and
> > currently
> > > > > people that could review/drive/mentor this effort are pretty much
> > > > strained
> > > > > :( For me one would have to investigate answers to those questions
> > and
> > > > > prioritise it compared to other ongoing efforts, before I could
> vote
> > +1
> > > > for
> > > > > this.
> > > > >
> > > > > Couple of things to consider:
> > > > > - would it be only a job manager/failure region recovery feature?
> > > > > - would it require changes in CheckpointBarrierHandler,
> > > > > CheckpointCoordinator classes?
> > > > > - with `at-most-once` semantic theoretically speaking we could just
> > > drop
> > > > > the current `CheckpointBarrier` handling/injecting code and avoid
> all
> > > of
> > > > > the checkpoint alignment issues - we could just checkpoint all of
> the
> > > > tasks
> > > > > independently of one another. However maybe that could be a follow
> up
> > > > > optimisation step?
> > > > >
> > > > > Piotrek
> > > > >
> > > > > > On 11 Jun 2019, at 10:53, Zili Chen <wander4...@gmail.com>
> wrote:
> > > > > >
> > > > > > Hi Xiaogang,
> > > > > >
> > > > > > It is an interesting topic.
> > > > > >
> > > > > > Notice that there is some effort to build a mature mllib of flink
> > > these
> > > > > > days, it could be also possible for some ml cases trade off
> > > correctness
> > > > > for
> > > > > > timeliness or throughput. Excatly-once delivery excatly makes
> flink
> > > > stand
> > > > > > out but an at-most-once option would adapt flink to more
> scenarios.
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > >
> > > > > > SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> > > > > >
> > > > > >> Flink offers a fault-tolerance mechanism to guarantee
> > at-least-once
> > > > and
> > > > > >> exactly-once message delivery in case of failures. The mechanism
> > > works
> > > > > well
> > > > > >> in practice and makes Flink stand out among stream processing
> > > systems.
> > > > > >>
> > > > > >> But the guarantee on at-least-once and exactly-once delivery
> does
> > > not
> > > > > come
> > > > > >> without price. It typically requires to restart multiple tasks
> and
> > > > fall
> > > > > >> back to the place where the last checkpoint is taken.
> > (Fined-grained
> > > > > >> recovery can help alleviate the cost, but it still needs certain
> > > > > efforts to
> > > > > >> recover jobs.)
> > > > > >>
> > > > > >> In some senarios, users perfer quick recovery and will trade
> > > > correctness
> > > > > >> off. For example, in some online recommendation systems,
> > timeliness
> > > is
> > > > > far
> > > > > >> more important than consistency. In such cases, we can restart
> > only
> > > > > those
> > > > > >> failed tasks individually, and do not need to perform any
> > rollback.
> > > > > Though
> > > > > >> some messages delivered to failed tasks may be lost, other tasks
> > can
> > > > > >> continuously provide service to users.
> > > > > >>
> > > > > >> Many of our users are demanding for at-most-once delivery in
> > Flink.
> > > > > What do
> > > > > >> you think of the proposal? Any feedback is appreciated.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Xiaogang Shi
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to