Hi Panagiotis,

How about to introduce a config option to control which error handling
plugins should be used? It is more flexible for deployments. Additionally,
it can also enable users to explicitly specify the order that the plugins
take effects.

Thanks,
Zhu

Gen Luo <luogen...@gmail.com> 于2023年3月27日周一 15:02写道:
>
> Thanks for the summary!
>
> Also +1 to support custom restart strategies in a different FLIP,
> as long as we can make sure that the plugin interface won't be
> changed when the restart strategy interface is introduced.
>
> To achieve this, maybe we should think well how the handler
> would cooperate with the restart strategy, like would it executes b
> efore the strategy (e.g. some strategy may use the tag), or after
> it (e.g. some metric reporting handler may use the handling result).
> Though we can implement in one way, and extend if the other is
> really necessary by someone.
>
> Besides, instead of using either of the names, shall we just make
> them two subclasses named FailureEnricher and FailureListener?
> The former executes synchronously and can modify the context,
> while the latter executes asynchronously and has a read-only view
> of context. In this way we can make sure a handler behaves in
> the expected way.
>
>
> On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu <reed...@gmail.com> wrote:
>
> > +1 to support custom restart strategies in a different FLIP.
> >
> > It's fine to have a different plugin for custom restart strategy.
> > If so, since we do not treat the FLIP-304 plugin as a common failure
> > handler, but instead mainly targets to add labels to errors, I would
> > +1 for the name `FailureEnricher`.
> >
> > Thanks,
> > Zhu
> >
> > David Morávek <d...@apache.org> 于2023年3月23日周四 15:51写道:
> > >
> > > >
> > > > One additional remark on introducing it as an async operation: We would
> > > > need a new configuration parameter to define the timeout for such a
> > > > listener call, wouldn't we?
> > > >
> > >
> > > This could be left up to the implementor to handle.
> > >
> > > What about adding an extra method getNamespace() to the Listener
> > interface
> > > > which returns an Optional<String>.
> > > >
> > >
> > > I'd avoid mixing an additional concept into this. We can simply have a
> > new
> > > method that returns a set of keys the listener can output. We can
> > validate
> > > this at the JM startup time and fail fast (since it's a configuration
> > > error) if there is an overlap. If the listener outputs the key that is
> > not
> > > allowed to, I wouldn't be afraid to call into a fatal error handler since
> > > it's an invalid implementation.
> > >
> > > Best,
> > > D.
> > >
> > > On Thu, Mar 23, 2023 at 8:34 AM Matthias Pohl
> > > <matthias.p...@aiven.io.invalid> wrote:
> > >
> > > > Sounds good. Two points I want to add:
> > > >
> > > >    - Listener execution should be independent — however we need a way
> > to
> > > > > enforce a Label key/key-prefix is only assigned to a single Listener,
> > > > > thinking of a validation step both at Listener init and runtime
> > stages
> > > > >
> > > > What about adding an extra method getNamespace() to the Listener
> > interface
> > > > which returns an Optional<String>. Therefore, the implementation/the
> > user
> > > > can decide depending on the use case whether it's necessary to have
> > > > separate namespaces for the key/value pairs or not. On the Flink side,
> > we
> > > > would just merge the different maps considering their namespaces.
> > > >
> > > > A flaw of this approach is that if a user decides to use the same
> > namespace
> > > > for multiple listeners, how is an error in one of the listeners
> > represented
> > > > in the outcome? We would have to overwrite either the successful
> > listener's
> > > > result or the failed ones. I wanted to share it, anyway.
> > > >
> > > > One additional remark on introducing it as an async operation: We would
> > > > need a new configuration parameter to define the timeout for such a
> > > > listener call, wouldn't we?
> > > >
> > > > Matthias
> > > >
> > > > On Wed, Mar 22, 2023 at 4:56 PM Panagiotis Garefalakis <
> > pga...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > >
> > > > > Thanks for the valuable comments!
> > > > > Excited to see this is an area of interest for the community!
> > > > >
> > > > > Summarizing some of the main points raised along with my thoughts:
> > > > >
> > > > >    - Labels (Key/Value) pairs are more expressive than Tags
> > (Strings) so
> > > > >    using the former is a good idea — I am also debating if we want to
> > > > > return
> > > > >    multiple KV pairs per Listener (one could argue that we could
> > split
> > > > the
> > > > >    logic in multiple Listeners to support that)
> > > > >    - An immutable context along with data returned using the
> > interface
> > > > >    method implementations is a better approach than a mutable
> > Collection
> > > > >    - Listener execution should be independent — however we need a
> > way to
> > > > >    enforce a Label key/key-prefix is only assigned to a single
> > Listener,
> > > > >    thinking of a validation step both at Listener init and runtime
> > stages
> > > > >    - We want to perform async Listener operations as sync could
> > block the
> > > > >    main thread — exposing an ioExecutor pool through the context
> > could be
> > > > > an
> > > > >    elegant solution here
> > > > >    - Make sure Listener errors are not failing jobs — make sure to
> > log
> > > > and
> > > > >    keep the job alive
> > > > >    - We need better naming / public interface separation/description
> > > > >
> > > > >         -  Even though custom restart strategies share some
> > properties
> > > > with
> > > > > Listeners, they would probably need a separate interface with a
> > different
> > > > > return type anyway (restart strategy not labels) and in general they
> > are
> > > > > different and complex enough to justify their own FLIP (that can
> > also be
> > > > a
> > > > > follow-up).
> > > > >
> > > > >
> > > > > What do people think? I am planning to modify the FLIP to reflect
> > these
> > > > > changes if they make sense to everyone.
> > > > >
> > > > > Cheers,
> > > > > Panagiotis
> > > > >
> > > > > On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh <hlteo...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Thank you Panagiotis for proposing this. From the size of the
> > thread,
> > > > > this
> > > > > > is a much needed feature in Flink!
> > > > > > Some thoughts, to extend those already adeptly summarised by Piotr,
> > > > > > Matthias and Jing.
> > > > > >
> > > > > > - scope of FLIP: +1 to scoping this FLIP to observability around a
> > > > > > restart. That would include adding metadata + exposing metadata to
> > > > > external
> > > > > > systems. IMO, introducing a new restart strategy solves different
> > > > > problems,
> > > > > > is much larger scope and should be covered in a separate FLIP.
> > > > > >
> > > > > > - failure handling: At the moment, we propose transitioning the
> > Flink
> > > > job
> > > > > > to a terminal FAILED state when JobListener fails, when the job
> > could
> > > > > have
> > > > > > transitioned to RESTARTING->RUNNING. If we are keeping in line
> > with the
> > > > > > scope to add metadata/observability around job restarts, we should
> > not
> > > > be
> > > > > > affecting the running of the Flink job itself. Could I propose we
> > > > instead
> > > > > > log WARN/ERROR.
> > > > > >
> > > > > > - immutable context: +1 to keeping the contract clear via return
> > types.
> > > > > > - async operation: +1 to adding ioexecutor to context, however,
> > given
> > > > we
> > > > > > don’t want to block the actual job restart on adding metadata /
> > calling
> > > > > > external services, should we consider returning and letting futures
> > > > > > complete independently?
> > > > > >
> > > > > > - independent vs ordered execution: Should we consider making the
> > order
> > > > > of
> > > > > > execution deterministic (use a List instead of Set)?
> > > > > >
> > > > > >
> > > > > > Once again, thank you for working on this.
> > > > > >
> > > > > > Regards,
> > > > > > Hong
> > > > > >
> > > > > >
> > > > > > > On 21 Mar 2023, at 21:07, Jing Ge <j...@ververica.com.INVALID>
> > > > wrote:
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Thanks Panagiotis for this FLIP and thanks for all valuable
> > > > > discussions.
> > > > > > > I'd like to share my two cents:
> > > > > > >
> > > > > > > - FailureListenerContext#addTag and
> > FailureListenerContext#getTags.
> > > > It
> > > > > > > seems that we have to call getTags() and then do remove
> > activities if
> > > > > we
> > > > > > > want to delete any tags (according to the javadoc in the FLIP).
> > It
> > > > is
> > > > > > > inconsistent for me too. Either offer addTag(), deleteTag(), and
> > let
> > > > > > > getTags() return immutable collection, or offer getTags() only to
> > > > > return
> > > > > > > mutable collection.
> > > > > > >
> > > > > > > - label vs tag. Label is a great idea +1. AFAIC, tag could be a
> > > > special
> > > > > > > case of label, i.e. key="tag". It is convenient to offer the
> > xxxTag()
> > > > > > > method if the user only needs one label. I would love to have
> > both of
> > > > > > them.
> > > > > > > Another thought is that tag implicitly contains the meaning of
> > > > > > "immutable".
> > > > > > >
> > > > > > > - +1 for a separate FLIP of customized restart strategy.
> > Attention
> > > > > should
> > > > > > > be taken to make sure it works well with Flink built-in
> > > > restartStrategy
> > > > > > in
> > > > > > > order to have the single source of truth.
> > > > > > >
> > > > > > > - execution order. The default independent execution should be
> > fine.
> > > > > > > According to the FailureListener interface definition in the
> > FLIP,
> > > > > users
> > > > > > > should be able to easily build a listener chain[1] to offer
> > > > sequential
> > > > > > > execution, e.g. public FailureListener(FailureListener
> > nextListener).
> > > > > > > Another option is to modify the interface or provide another
> > > > interface
> > > > > > > alongside the current one to extend the method to support
> > > > > ListenerChain,
> > > > > > > i.e. void onFailure(Throwable cause, FailureListenerContext
> > context,
> > > > > > > ListenerChain listenerChain). Users can also mix them up.
> > > > > > >
> > > > > > > - naming. Afaiu, the pluggable extension is not limited to
> > failure
> > > > > > > enrichment. Conceptually it can do everything for the given
> > failure,
> > > > > e.g.
> > > > > > > start counting metric as the FLIP described, calling an external
> > > > > system,
> > > > > > > sending notification to slack channel, etc. you name it. It
> > sounds to
> > > > > me
> > > > > > > more like a FailureActionListener - it can trigger actions based
> > on
> > > > > > > failure. Failure enrichment is one type of action.
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Jing
> > > > > > >
> > > > > > > [1]
> > https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
> > > > > > >
> > > > > > > On Tue, Mar 21, 2023 at 3:39 PM Matthias Pohl
> > > > > > > <matthias.p...@aiven.io.invalid> wrote:
> > > > > > >
> > > > > > >> Thanks for the proposal, Panagiotis. A lot of good points have
> > been
> > > > > > already
> > > > > > >> shared. I just want to add my view on some of the items:
> > > > > > >>
> > > > > > >> - independent execution vs ordered execution: I prefer the
> > listeners
> > > > > > being
> > > > > > >> processed independently from each other because it adds less
> > > > > complexity
> > > > > > >> code-wise. The use case Piotr described (where you want to reuse
> > > > some
> > > > > > other
> > > > > > >> classifier) is the only one I can think of where we actually
> > need
> > > > > > >> classifiers depending on each other. Supporting such a use case
> > > > right
> > > > > > from
> > > > > > >> the start feels a bit over-engineered and could be covered in a
> > > > > > follow-up
> > > > > > >> FLIP if we really come to that point where such a feature is
> > > > requested
> > > > > > by
> > > > > > >> users.
> > > > > > >>
> > > > > > >> - key/value pairs instead of plain labels: I think that's a good
> > > > idea.
> > > > > > >> key/value pairs are more expressive. +1
> > > > > > >>
> > > > > > >> - extending the FLIP to cover restart strategy: I understand
> > Gen's
> > > > > > concern
> > > > > > >> about introducing too many different types of plugins. But I
> > would
> > > > > still
> > > > > > >> favor not extending the FLIP in this regard. A pluggable restart
> > > > > > strategy
> > > > > > >> sounds reasonable. But an error classifier and a restart
> > strategy
> > > > are
> > > > > > still
> > > > > > >> different enough to justify separate plugins, IMHO. And
> > therefore, I
> > > > > > would
> > > > > > >> think that covering the restart strategy in a separate FLIP is
> > the
> > > > > > better
> > > > > > >> option for the sake of simplicity.
> > > > > > >>
> > > > > > >> - immutable context: Passing in an immutable context and
> > returning
> > > > > data
> > > > > > >> through the interface method's return value sounds like a better
> > > > > > approach
> > > > > > >> to harden the contract of the interface. +1 for that proposal
> > > > > > >>
> > > > > > >> - async operation: I think David is right. An async interface
> > makes
> > > > > the
> > > > > > >> listener implementations more robust when it comes to heavy IO
> > > > > > operations.
> > > > > > >> The ioExecutor can be passed through the context object. +1
> > > > > > >>
> > > > > > >> Matthias
> > > > > > >>
> > > > > > >> On Tue, Mar 21, 2023 at 2:09 PM David Morávek <
> > > > > david.mora...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> *@Piotr*
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>> I was thinking about actually defining the order of the
> > > > > > >>>> classifiers/handlers and not allowing them to be asynchronous.
> > > > > > >>>> Asynchronousity would create some problems: when to actually
> > > > return
> > > > > > the
> > > > > > >>>> error to the user? After all async responses will get back?
> > > > Before,
> > > > > > but
> > > > > > >>>> without classified exception? It would also add implementation
> > > > > > >> complexity
> > > > > > >>>> and I think we can always expand the API with async version
> > in the
> > > > > > >> future
> > > > > > >>>> if needed.
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> As long as the classifiers need to talk to an external system,
> > we
> > > > by
> > > > > > >>> definition need to allow them to be asynchronous to unblock the
> > > > main
> > > > > > >> thread
> > > > > > >>> for handling other RPCs. Exposing ioExecutor via the context
> > > > proposed
> > > > > > >> above
> > > > > > >>> would be great.
> > > > > > >>>
> > > > > > >>> After all async responses will get back
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> This would be the same if we trigger them synchronously one by
> > one,
> > > > > > with
> > > > > > >> a
> > > > > > >>> caveat that synchronous execution might take significantly
> > longer
> > > > and
> > > > > > >>> introduce unnecessary downtime to a job.
> > > > > > >>>
> > > > > > >>> D.
> > > > > > >>>
> > > > > > >>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu <reed...@gmail.com>
> > wrote:
> > > > > > >>>
> > > > > > >>>> Hi Piotr,
> > > > > > >>>>
> > > > > > >>>> It's fine to me to have a separate FLIP to extend this
> > > > > > >> `FailureListener`
> > > > > > >>>> to support custom restart strategy.
> > > > > > >>>>
> > > > > > >>>> What I was a bit concerned is that if we just treat the
> > > > > > >> `FailureListener`
> > > > > > >>>> as an error classifier which is not crucial to Flink framework
> > > > > > process,
> > > > > > >>>> we may design it to run asynchronously and not trigger Flink
> > > > > failures.
> > > > > > >>>> This may be a blocker if later we want to enable it to support
> > > > > custom
> > > > > > >>>> restart strategy.
> > > > > > >>>>
> > > > > > >>>> Thanks,
> > > > > > >>>> Zhu
> > > > > > >>>>
> > > > > > >>>> Dian Fu <dian0511...@gmail.com> 于2023年3月21日周二 19:53写道:
> > > > > > >>>>>
> > > > > > >>>>> Hi Panagiotis,
> > > > > > >>>>>
> > > > > > >>>>> Thanks for the proposal. This is a very valuable feature and
> > will
> > > > > be
> > > > > > >> a
> > > > > > >>>> good
> > > > > > >>>>> add-on for Flink.
> > > > > > >>>>>
> > > > > > >>>>> I also think that it will be great if we can consider how to
> > make
> > > > > it
> > > > > > >>>>> possible for users to customize the failure handling in this
> > > > FLIP.
> > > > > > >> It's
> > > > > > >>>>> highly related to the problem we want to address in this
> > FLIP and
> > > > > > >> could
> > > > > > >>>>> avoid refactoring the interfaces proposed in this FLIP too
> > > > quickly.
> > > > > > >>>>>
> > > > > > >>>>> Currently it treats all kinds of exceptions the same.
> > However,
> > > > some
> > > > > > >>> kinds
> > > > > > >>>>> of exceptions are actually not recoverable at all. It could
> > let
> > > > > users
> > > > > > >>> to
> > > > > > >>>>> customize the failure handling logic to fail fast for certain
> > > > known
> > > > > > >>>>> unrecoverable exceptions and finally make these kinds of
> > jobs get
> > > > > > >>> noticed
> > > > > > >>>>> and recoveried more quickly.
> > > > > > >>>>>
> > > > > > >>>>> Regards,
> > > > > > >>>>> Dian
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> On Tue, Mar 21, 2023 at 4:36 PM Gen Luo <luogen...@gmail.com
> > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> Hi Panagiotis,
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks for the proposal.
> > > > > > >>>>>>
> > > > > > >>>>>> It's useful to enrich the information so that users can be
> > more
> > > > > > >>>>>> clear why the job is failing, especially platform
> > developers who
> > > > > > >>>>>> need to provide the information to their end users.
> > > > > > >>>>>> And for the very FLIP, I'd prefer the naming
> > `FailureEnricher`
> > > > > > >>>>>> proposed by David, as the plugin doesn't really handle the
> > > > > failure.
> > > > > > >>>>>>
> > > > > > >>>>>> However, like Zhu and Lijie said, I also joined a discussion
> > > > > > >>>>>> recently about customized failure handling, e.g. counting
> > the
> > > > > > >>>>>> failure rate of pipeline regions separately, and failing
> > the job
> > > > > > >>>>>> when a specific error occurs, and so on.
> > > > > > >>>>>> I suppose a custom restart strategy, or I'd call it a custom
> > > > > > >>>>>> failure "handler", is indeed necessary. It can also enrich
> > the
> > > > > > >>>>>> information as the current proposed handler does.
> > > > > > >>>>>>
> > > > > > >>>>>> To avoid adding too many plugin interfaces which may confuse
> > > > users
> > > > > > >>>>>> and make the ExecutionFailureHandler more complex,
> > > > > > >>>>>> I think it'd be better to consider the requirements at the
> > same
> > > > > > >> time.
> > > > > > >>>>>>
> > > > > > >>>>>> IMO, we can add a handler interface, then make the current
> > > > restart
> > > > > > >>>>>> strategy and the enricher both types of the handler. The
> > > > handlers
> > > > > > >>>>>> execute in sequence, and the failure is considered
> > unrecoverable
> > > > > if
> > > > > > >>>>>> any of the handlers decides.
> > > > > > >>>>>> In this way, users can also implement a handler using the
> > > > enriched
> > > > > > >>>>>> information provided by the previous handlers, e.g. fail
> > the job
> > > > > > >> and
> > > > > > >>>>>> send a notification if too many failures are caused by the
> > end
> > > > > > >> users.
> > > > > > >>>>>>
> > > > > > >>>>>> Best,
> > > > > > >>>>>> Gen
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu <
> > > > > huweihua....@gmail.com
> > > > > > >>>
> > > > > > >>>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>>> Hi Panagiotis,
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks for your proposal. It is valuable to analyze the
> > reason
> > > > > > >> for
> > > > > > >>>>>>> failure with the user plug-in.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Making the context immutable could make the contract
> > stronger.
> > > > > > >>>>>>> Letting the listener return an enriching result may be a
> > better
> > > > > > >>> way.
> > > > > > >>>>>>>
> > > > > > >>>>>>> IIUC, listeners could do two things, enrich more
> > information
> > > > > > >>>>>> (tags/labels)
> > > > > > >>>>>>> to FailureHandlingResult, and push data out of Flink
> > (metrics
> > > > or
> > > > > > >>>>>>> something).
> > > > > > >>>>>>> IMO, we could split these two types into Listener and
> > Advisor
> > > > > > >>> (maybe
> > > > > > >>>>>>> other names). The Listener just pushes the data out and
> > returns
> > > > > > >>>> nothing
> > > > > > >>>>>> to
> > > > > > >>>>>>> Flink, so we can run these async and don't have to wait for
> > > > > > >>>> Listener's
> > > > > > >>>>>>> result.
> > > > > > >>>>>>> The Advisor returns rich information to the
> > > > FailureHadingResult,
> > > > > > >>>> and it
> > > > > > >>>>>>> should
> > > > > > >>>>>>> have a lighter logic.
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> Supporting a custom restart strategy is also valuable. In
> > this
> > > > > > >>>> design, we
> > > > > > >>>>>>> use
> > > > > > >>>>>>> RestartStrategy to construct a FailureHandingResult, and
> > then
> > > > > > >> pass
> > > > > > >>>> it to
> > > > > > >>>>>>> Listener.
> > > > > > >>>>>>> My question is, should we change the restart strategy
> > interface
> > > > > > >> to
> > > > > > >>>>>> support
> > > > > > >>>>>>> the
> > > > > > >>>>>>> custom restart strategy, or keep the current restart
> > strategy
> > > > and
> > > > > > >>>> let the
> > > > > > >>>>>>> later
> > > > > > >>>>>>> Listener enrich the restartable information to
> > > > > > >>> FailureHandingResult?
> > > > > > >>>> The
> > > > > > >>>>>>> latter
> > > > > > >>>>>>> may cause some confusion when we use a custom restart
> > strategy.
> > > > > > >>>>>>> The default flink restart strategy also runs but does not
> > take
> > > > > > >>>> effect.
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> Best,
> > > > > > >>>>>>> Weihua
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang <
> > > > > > >>>> wangdachui9...@gmail.com>
> > > > > > >>>>>>> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Hi Panagiotis,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Thanks for driving this.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> +1 for supporting custom restart strategy, we did receive
> > such
> > > > > > >>>> requests
> > > > > > >>>>>>>> from the user mailing list [1][2].
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Besides, in current design, the plugin will only do some
> > > > > > >>>> statistical
> > > > > > >>>>>> and
> > > > > > >>>>>>>> classification work, and will not affect the
> > > > > > >>>> *FailureHandlingResult*.
> > > > > > >>>>>>> Just
> > > > > > >>>>>>>> listening, no handling, it doesn't quite match the title.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> [1]
> > > > > > >>>>
> > https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1
> > > > > > >>>>>>>> [2]
> > > > > > >>>>
> > https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Best,
> > > > > > >>>>>>>> Lijie
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Hi Panagiotis,
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks for creating this proposal! It's good to enable
> > Flink
> > > > > > >> to
> > > > > > >>>>>> handle
> > > > > > >>>>>>>>> different errors in different ways, through a pluggable
> > way.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> There are requests for flexible restart strategies from
> > time
> > > > > > >> to
> > > > > > >>>> time,
> > > > > > >>>>>>> for
> > > > > > >>>>>>>>> different strategies of restart backoff time, or to
> > suppress
> > > > > > >>>>>> restarting
> > > > > > >>>>>>>>> on certain errors. Therefore, I think it's better that
> > the
> > > > > > >>>> proposed
> > > > > > >>>>>>>>> failure handling plugin can also support custom restart
> > > > > > >>>> strategies.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Maybe we can call it FailureHandlingAdvisor which
> > provides
> > > > > > >> more
> > > > > > >>>>>>>>> information (labels) and gives advice (restart backoff
> > time,
> > > > > > >>>> whether
> > > > > > >>>>>>>>> to restart)? I do not have a strong opinion though, any
> > > > > > >>>> explanatory
> > > > > > >>>>>>>>> name would be good.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> To avoid unexpected mutation, how about to make the
> > context
> > > > > > >>>> immutable
> > > > > > >>>>>>>>> and let the plugin return an immutable result? i.e.
> > remove
> > > > > > >> the
> > > > > > >>>>>> setters
> > > > > > >>>>>>>>> from the context, and let the plugin method return a
> > result
> > > > > > >>> which
> > > > > > >>>>>>>>> contains `labels`, `canRestart` and `restartBackoffTime`.
> > > > > > >> Flink
> > > > > > >>>>>> should
> > > > > > >>>>>>>>> apply the result to the context before invoking the next
> > > > > > >>> plugin,
> > > > > > >>>> so
> > > > > > >>>>>>>>> that the next plugin will see the updated context.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> The plugin should avoid taking too much time to return
> > the
> > > > > > >>>> result,
> > > > > > >>>>>>>> because
> > > > > > >>>>>>>>> it will block the RPC and result in instability.
> > However, it
> > > > > > >>> can
> > > > > > >>>>>> still
> > > > > > >>>>>>>>> perform heavy actions in a different thread. The context
> > can
> > > > > > >>>> provide
> > > > > > >>>>>> an
> > > > > > >>>>>>>>> `ioExecutor` to the plugins for reuse.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks,
> > > > > > >>>>>>>>> Zhu
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Hi Panagiotis
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Thank you for your answer. I agree that
> > `FailureListener`
> > > > > > >>>> could be
> > > > > > >>>>>>>>>> stateless, then I have some thoughts as follows
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 1. I see that listeners and tag collections are
> > associated.
> > > > > > >>>> When
> > > > > > >>>>>>>>> JobManager
> > > > > > >>>>>>>>>> fails and restarts, how can the new listener be
> > associated
> > > > > > >>>> with the
> > > > > > >>>>>>> tag
> > > > > > >>>>>>>>>> collection before failover? Is the listener loading
> > order?
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 2. The tag collection may be too large, resulting in the
> > > > > > >>>> JobManager
> > > > > > >>>>>>>> OOM,
> > > > > > >>>>>>>>> do
> > > > > > >>>>>>>>>> we need to provide a management class that supports some
> > > > > > >>>>>> obsolescence
> > > > > > >>>>>>>>>> strategies instead of a direct Collection?
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 3. Is it possible to provide a more complex data
> > structure
> > > > > > >>>> than a
> > > > > > >>>>>>>> simple
> > > > > > >>>>>>>>>> string collection for tags in listeners, such as
> > key-value?
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Best,
> > > > > > >>>>>>>>>> Shammon FY
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu <
> > > > > > >>> xbjt...@gmail.com>
> > > > > > >>>>>>> wrote:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> Hi,Panagiotis
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Thank you for kicking off this discussion. Overall, the
> > > > > > >>>> proposed
> > > > > > >>>>>>>>> feature of
> > > > > > >>>>>>>>>>> this FLIP makes sense to me. We have also discussed
> > > > > > >> similar
> > > > > > >>>>>>>>> requirements
> > > > > > >>>>>>>>>>> with our users and developers, and I believe it will
> > help
> > > > > > >>>> many
> > > > > > >>>>>>> users.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> In terms of FLIP content, I have some thoughts:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> (1) For the FailureListenerContextget interface, the
> > > > > > >>> methods
> > > > > > >>>>>>>>>>> FailureListenerContext#addTag and
> > > > > > >>>> FailureListenerContextgetTags
> > > > > > >>>>>>> looks
> > > > > > >>>>>>>>> very
> > > > > > >>>>>>>>>>> inconsistent because they imply specific implementation
> > > > > > >>>> details,
> > > > > > >>>>>>> and
> > > > > > >>>>>>>>> not
> > > > > > >>>>>>>>>>> all FailureListeners need to handle them, we shouldn't
> > > > > > >> put
> > > > > > >>>> them
> > > > > > >>>>>> in
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>> interface. Minor: The comment "UDF loading" in the
> > > > > > >>>>>>>> getUserClassLoader()
> > > > > > >>>>>>>>>>> method looks like a typo, IIUC it should return the
> > > > > > >>>> classloader
> > > > > > >>>>>> of
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>> current job.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> (2) Regarding the implementation in
> > > > > > >>>>>>>>> ExecutionFailureHandler#handleFailure,
> > > > > > >>>>>>>>>>> some custom listeners may have heavy IO operations,
> > such
> > > > > > >> as
> > > > > > >>>>>>> reporting
> > > > > > >>>>>>>>> to
> > > > > > >>>>>>>>>>> their monitoring system. The current logic appears to
> > be
> > > > > > >>>>>> processing
> > > > > > >>>>>>>> in
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>> JobMaster's main thread, and it is recommended not to
> > do
> > > > > > >>> this
> > > > > > >>>>>> kind
> > > > > > >>>>>>> of
> > > > > > >>>>>>>>>>> processing in the main thread.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> (3) The results of FailureListener's processing and the
> > > > > > >>>>>>>>>>> FailureHandlingResult returned by
> > ExecutionFailureHandler
> > > > > > >>>> are not
> > > > > > >>>>>>>>> related.
> > > > > > >>>>>>>>>>> I think these two are closely related, the motivation
> > of
> > > > > > >>> this
> > > > > > >>>>>> FLIP
> > > > > > >>>>>>> is
> > > > > > >>>>>>>>> to
> > > > > > >>>>>>>>>>> make current failure handling more flexible. From this
> > > > > > >>>>>> perspective,
> > > > > > >>>>>>>>>>> different listeners should have the opportunity to
> > affect
> > > > > > >>> the
> > > > > > >>>>>> job's
> > > > > > >>>>>>>>> failure
> > > > > > >>>>>>>>>>> handling flow. For example, a Flink job is configured
> > > > > > >> with
> > > > > > >>> a
> > > > > > >>>>>>>>>>> RestartStrategy with huge numbers retry , but the Kafka
> > > > > > >>>> topic of
> > > > > > >>>>>>>>> Source has
> > > > > > >>>>>>>>>>> been deleted, the job will failover continuously. In
> > this
> > > > > > >>>> case,
> > > > > > >>>>>> the
> > > > > > >>>>>>>>> user
> > > > > > >>>>>>>>>>> should have their listener to determine whether this
> > > > > > >>> failure
> > > > > > >>>> is
> > > > > > >>>>>>>>> recoverable
> > > > > > >>>>>>>>>>> or unrecoverable, and then wrap the processing result
> > > > > > >> into
> > > > > > >>>>>>>>>>> FailureHandlingResult.unrecoverable(xx) and pass it to
> > > > > > >>>> JobMaster,
> > > > > > >>>>>>>> this
> > > > > > >>>>>>>>>>> approach will be more flexible.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> (4) All FLIPs have an important section named Public
> > > > > > >>>> Interfaces.
> > > > > > >>>>>>>>> Current
> > > > > > >>>>>>>>>>> FLIP mixes the interface section and the implementation
> > > > > > >>>> section
> > > > > > >>>>>>>>> together.
> > > > > > >>>>>>>>>>> It is better for us to refer to the FLIP template[1]
> > and
> > > > > > >>>> separate
> > > > > > >>>>>>>> them,
> > > > > > >>>>>>>>>>> this will make the entire FLIP clearer.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> In addition, regarding the FLIP process, there is a
> > small
> > > > > > >>>>>>> suggestion:
> > > > > > >>>>>>>>> The
> > > > > > >>>>>>>>>>> community generally creates a JIRA issue after the FLIP
> > > > > > >>> vote
> > > > > > >>>> is
> > > > > > >>>>>>>> passed,
> > > > > > >>>>>>>>>>> instead of during the FLIP preparation phase because
> > the
> > > > > > >>>> FLIP may
> > > > > > >>>>>>> be
> > > > > > >>>>>>>>>>> rejected. Although this FLIP is very reasonable, it's
> > > > > > >>> better
> > > > > > >>>> to
> > > > > > >>>>>>>> follow
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>> process.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Leonard
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> [1]
> > > > > > >>>>>>>
> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> On Mon, Mar 20, 2023 at 7:04 PM David Morávek <
> > > > > > >>>> d...@apache.org>
> > > > > > >>>>>>>> wrote:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> however listeners can use previous state
> > > > > > >> (tags/labels)
> > > > > > >>> to
> > > > > > >>>>>> make
> > > > > > >>>>>>>>>>> decisions
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> That sounds like a very fragile contract. We should
> > > > > > >>> either
> > > > > > >>>>>> allow
> > > > > > >>>>>>>>> passing
> > > > > > >>>>>>>>>>>> tags between listeners and then need to define
> > ordering
> > > > > > >>> or
> > > > > > >>>> make
> > > > > > >>>>>>> all
> > > > > > >>>>>>>>> of
> > > > > > >>>>>>>>>>> them
> > > > > > >>>>>>>>>>>> independent. I prefer the latter because it allows us
> > > > > > >> to
> > > > > > >>>>>>>> parallelize
> > > > > > >>>>>>>>>>> things
> > > > > > >>>>>>>>>>>> if needed (if all listeners trigger an RCP to the
> > > > > > >>> external
> > > > > > >>>>>>> system,
> > > > > > >>>>>>>>> for
> > > > > > >>>>>>>>>>>> example).
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Can you expand on why we need more than one classifier
> > > > > > >> to
> > > > > > >>>> be
> > > > > > >>>>>> able
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>>> output
> > > > > > >>>>>>>>>>>> the same tag?
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> system ones come first and then the ones loaded from
> > > > > > >> the
> > > > > > >>>> plugin
> > > > > > >>>>>>>>> manager
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Since they're returned as a Set, the order is
> > > > > > >> completely
> > > > > > >>>>>>>>>>> non-deterministic,
> > > > > > >>>>>>>>>>>> no matter in which order they're loaded.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> just communicating with external monitoring/alerting
> > > > > > >>>> systems
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> That makes the need for pushing things out of the main
> > > > > > >>>> thread
> > > > > > >>>>>>> even
> > > > > > >>>>>>>>>>>> stronger. This almost sounds like we need to return a
> > > > > > >>>>>>>>> CompletableFuture
> > > > > > >>>>>>>>>>> for
> > > > > > >>>>>>>>>>>> the per-throwable classification because an external
> > > > > > >>> system
> > > > > > >>>>>> might
> > > > > > >>>>>>>>> take a
> > > > > > >>>>>>>>>>>> significant time to respond. We need to unblock the
> > > > > > >> main
> > > > > > >>>> thread
> > > > > > >>>>>>> for
> > > > > > >>>>>>>>> other
> > > > > > >>>>>>>>>>>> RPCs.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Also, in the proposal, this happens in the failure
> > > > > > >>>> handler. If
> > > > > > >>>>>>>>> that's the
> > > > > > >>>>>>>>>>>> case, this might block the job from being restarted
> > (if
> > > > > > >>> the
> > > > > > >>>>>>> restart
> > > > > > >>>>>>>>>>>> strategy allows for another restart), which would be
> > > > > > >>> great
> > > > > > >>>> to
> > > > > > >>>>>>> avoid
> > > > > > >>>>>>>>>>> because
> > > > > > >>>>>>>>>>>> it can introduce extra downtime.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> This raises another question: what should happen if
> > the
> > > > > > >>>>>>>>> classification
> > > > > > >>>>>>>>>>>> fails? Crashing the job (which is what's currently
> > > > > > >>>> proposed)
> > > > > > >>>>>>> seems
> > > > > > >>>>>>>>> very
> > > > > > >>>>>>>>>>>> dangerous if this might depend on an external system.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Thats a valid point, passing the JobGraph containing
> > > > > > >> all
> > > > > > >>>> the
> > > > > > >>>>>>> above
> > > > > > >>>>>>>>>>>>> information is also something to consider
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> We should avoid passing JG around because it's mutable
> > > > > > >>>> (which
> > > > > > >>>>>> we
> > > > > > >>>>>>>>> must fix
> > > > > > >>>>>>>>>>>> in the long term), and letting users change it might
> > > > > > >> have
> > > > > > >>>>>>>>> consequences.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>> D.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis
> > > > > > >> <
> > > > > > >>>>>>>>>>> pga...@apache.org>
> > > > > > >>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Hey David, Shammon,
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thanks for the valuable comments!
> > > > > > >>>>>>>>>>>>> I am glad you find this proposal useful, some
> > > > > > >> thoughts:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> @Shammon
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 1. How about adding more job information in
> > > > > > >>>>>>>>> FailureListenerContext? For
> > > > > > >>>>>>>>>>>>>> example, job vertext, subtask, taskmanager
> > > > > > >> location.
> > > > > > >>>> And
> > > > > > >>>>>> then
> > > > > > >>>>>>>>> user
> > > > > > >>>>>>>>>>> can
> > > > > > >>>>>>>>>>>> do
> > > > > > >>>>>>>>>>>>>> more statistics according to different dimensions.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thats a valid point, passing the JobGraph containing
> > > > > > >>> all
> > > > > > >>>> the
> > > > > > >>>>>>>> above
> > > > > > >>>>>>>>>>>>> information
> > > > > > >>>>>>>>>>>>> is also something to consider, I was mostly trying to
> > > > > > >>> be
> > > > > > >>>>>>>>> conservative:
> > > > > > >>>>>>>>>>>>> i.e., passingly only the information we need, and
> > > > > > >>> extend
> > > > > > >>>> as
> > > > > > >>>>>> we
> > > > > > >>>>>>>> see
> > > > > > >>>>>>>>> fit
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 2. Users may want to save results in listener, and
> > > > > > >> then
> > > > > > >>>> they
> > > > > > >>>>>>> can
> > > > > > >>>>>>>>> get
> > > > > > >>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> historical results even jabmanager failover. Can we
> > > > > > >>>>>> provide a
> > > > > > >>>>>>>>> unified
> > > > > > >>>>>>>>>>>>>> implementation for data storage requirements?
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> The idea is to store only the output of the Listeners
> > > > > > >>>> (tags)
> > > > > > >>>>>>> and
> > > > > > >>>>>>>>> treat
> > > > > > >>>>>>>>>>>> them
> > > > > > >>>>>>>>>>>>> as stateless.
> > > > > > >>>>>>>>>>>>> Tags are be stored along with HistoryEntries, and
> > > > > > >> will
> > > > > > >>> be
> > > > > > >>>>>>>> available
> > > > > > >>>>>>>>>>>> through
> > > > > > >>>>>>>>>>>>> the HistoryServer
> > > > > > >>>>>>>>>>>>> even after a JM dies.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> @David
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 1) Should we also consider adding labels? The
> > > > > > >>>> combination of
> > > > > > >>>>>>> tags
> > > > > > >>>>>>>>> and
> > > > > > >>>>>>>>>>>>>> labels seems to be what most systems offer;
> > > > > > >>> sometimes,
> > > > > > >>>> they
> > > > > > >>>>>>>> offer
> > > > > > >>>>>>>>>>>> labels
> > > > > > >>>>>>>>>>>>>> only (key=value pairs) because tags can be
> > > > > > >>> implemented
> > > > > > >>>>>> using
> > > > > > >>>>>>>>> those,
> > > > > > >>>>>>>>>>> but
> > > > > > >>>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>> the other way around.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Indeed changing tags to k:v labels could be more
> > > > > > >>>> expressive,
> > > > > > >>>>>> I
> > > > > > >>>>>>>>> like it!
> > > > > > >>>>>>>>>>>>> Let's see what others think.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined
> > > > > > >>> models
> > > > > > >>>>>>>>> ("listeners")
> > > > > > >>>>>>>>>>>> are
> > > > > > >>>>>>>>>>>>>> going to be, it would be great to keep the
> > > > > > >>>> interfaces/data
> > > > > > >>>>>>>>> structures
> > > > > > >>>>>>>>>>>>>> immutable so we can push things over to the I/O
> > > > > > >>>> threads.
> > > > > > >>>>>>> Also,
> > > > > > >>>>>>>> it
> > > > > > >>>>>>>>>>>> sounds
> > > > > > >>>>>>>>>>>>>> off to call the main interface a Listener since
> > > > > > >> it's
> > > > > > >>>>>> supposed
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>>>> enhance
> > > > > > >>>>>>>>>>>>>> the original throwable with additional metadata.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> The idea was for the name to be generic as there
> > > > > > >> could
> > > > > > >>> be
> > > > > > >>>>>>>> Listener
> > > > > > >>>>>>>>>>>>> implementations
> > > > > > >>>>>>>>>>>>> just communicating with external monitoring/alerting
> > > > > > >>>> systems
> > > > > > >>>>>>> and
> > > > > > >>>>>>>> no
> > > > > > >>>>>>>>>>>>> metadata output
> > > > > > >>>>>>>>>>>>> -- but lets rethink that. For immutability, see
> > > > > > >> below:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> 3) You're proposing to support a set of listeners.
> > > > > > >>> Since
> > > > > > >>>>>> you're
> > > > > > >>>>>>>>> passing
> > > > > > >>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> mutable context around, which includes tags set by
> > > > > > >>> the
> > > > > > >>>>>>> previous
> > > > > > >>>>>>>>>>>> listener,
> > > > > > >>>>>>>>>>>>>> do you expect users to make any assumptions about
> > > > > > >> the
> > > > > > >>>> order
> > > > > > >>>>>>> in
> > > > > > >>>>>>>>> which
> > > > > > >>>>>>>>>>>>>> listeners are executed?
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> In the existing proposal we are not making any
> > > > > > >>>> assumptions
> > > > > > >>>>>>> about
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>>> order
> > > > > > >>>>>>>>>>>>> of listeners,
> > > > > > >>>>>>>>>>>>> (system ones come first and then the ones loaded from
> > > > > > >>> the
> > > > > > >>>>>>> plugin
> > > > > > >>>>>>>>>>> manager)
> > > > > > >>>>>>>>>>>>> however listeners can use previous state
> > > > > > >> (tags/labels)
> > > > > > >>> to
> > > > > > >>>>>> make
> > > > > > >>>>>>>>>>> decisions:
> > > > > > >>>>>>>>>>>>> e.g., wont assign *UNKNOWN* failureType when we have
> > > > > > >>>> already
> > > > > > >>>>>>> seen
> > > > > > >>>>>>>>> *USER
> > > > > > >>>>>>>>>>>> *or
> > > > > > >>>>>>>>>>>>> the other way around -- when we have seen *UNKNOWN*
> > > > > > >>>> remove in
> > > > > > >>>>>>>>> favor of
> > > > > > >>>>>>>>>>>>> *USER*
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Cheers,
> > > > > > >>>>>>>>>>>>> Panagiotis
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> On Sun, Mar 19, 2023 at 10:42 AM David Morávek <
> > > > > > >>>>>>> d...@apache.org>
> > > > > > >>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Hi Panagiotis,
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> This is an excellent proposal and something
> > > > > > >> everyone
> > > > > > >>>> trying
> > > > > > >>>>>>> to
> > > > > > >>>>>>>>>>> provide
> > > > > > >>>>>>>>>>>>>> "Flink as a service" needs to solve at some point.
> > > > > > >> I
> > > > > > >>>> have a
> > > > > > >>>>>>>>> couple of
> > > > > > >>>>>>>>>>>>>> questions:
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> If I understand the proposal correctly, this is
> > > > > > >> just
> > > > > > >>>> about
> > > > > > >>>>>>>> adding
> > > > > > >>>>>>>>>>> tags
> > > > > > >>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>> the Throwable by running a tuple of (Throwable,
> > > > > > >>>>>>> FailureContext)
> > > > > > >>>>>>>>>>>> through a
> > > > > > >>>>>>>>>>>>>> user-defined model.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 1) Should we also consider adding labels? The
> > > > > > >>>> combination
> > > > > > >>>>>> of
> > > > > > >>>>>>>>> tags and
> > > > > > >>>>>>>>>>>>>> labels seems to be what most systems offer;
> > > > > > >>> sometimes,
> > > > > > >>>> they
> > > > > > >>>>>>>> offer
> > > > > > >>>>>>>>>>>> labels
> > > > > > >>>>>>>>>>>>>> only (key=value pairs) because tags can be
> > > > > > >>> implemented
> > > > > > >>>>>> using
> > > > > > >>>>>>>>> those,
> > > > > > >>>>>>>>>>> but
> > > > > > >>>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>> the other way around.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 2) Since we can not predict how heavy user-defined
> > > > > > >>>> models
> > > > > > >>>>>>>>>>> ("listeners")
> > > > > > >>>>>>>>>>>>> are
> > > > > > >>>>>>>>>>>>>> going to be, it would be great to keep the
> > > > > > >>>> interfaces/data
> > > > > > >>>>>>>>> structures
> > > > > > >>>>>>>>>>>>>> immutable so we can push things over to the I/O
> > > > > > >>>> threads.
> > > > > > >>>>>>> Also,
> > > > > > >>>>>>>> it
> > > > > > >>>>>>>>>>>> sounds
> > > > > > >>>>>>>>>>>>>> off to call the main interface a Listener since
> > > > > > >> it's
> > > > > > >>>>>> supposed
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>>>> enhance
> > > > > > >>>>>>>>>>>>>> the original throwable with additional metadata.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> I'd propose something along the lines of (we should
> > > > > > >>>> have
> > > > > > >>>>>>> better
> > > > > > >>>>>>>>>>> names,
> > > > > > >>>>>>>>>>>>> this
> > > > > > >>>>>>>>>>>>>> is just to outline the idea):
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> interface FailureEnricher {
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>  ThrowableWithTagsAndLabels
> > > > > > >> enrichFailure(Throwable
> > > > > > >>>> cause,
> > > > > > >>>>>>>>>>>>>> ImmutableContextualMetadataAboutTheThrowable
> > > > > > >>> context);
> > > > > > >>>>>>>>>>>>>> }
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> The names should change; this is just to outline
> > > > > > >> the
> > > > > > >>>> idea.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 3) You're proposing to support a set of listeners.
> > > > > > >>>> Since
> > > > > > >>>>>>> you're
> > > > > > >>>>>>>>>>> passing
> > > > > > >>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> mutable context around, which includes tags set by
> > > > > > >>> the
> > > > > > >>>>>>> previous
> > > > > > >>>>>>>>>>>> listener,
> > > > > > >>>>>>>>>>>>>> do you expect users to make any assumptions about
> > > > > > >> the
> > > > > > >>>> order
> > > > > > >>>>>>> in
> > > > > > >>>>>>>>> which
> > > > > > >>>>>>>>>>>>>> listeners are executed?
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> *@Shammon*
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Users may want to save results in listener, and
> > > > > > >> then
> > > > > > >>>> they
> > > > > > >>>>>> can
> > > > > > >>>>>>>>> get the
> > > > > > >>>>>>>>>>>>>>> historical results even jabmanager failover. Can
> > > > > > >> we
> > > > > > >>>>>>> provide a
> > > > > > >>>>>>>>>>> unified
> > > > > > >>>>>>>>>>>>>>> implementation for data storage requirements?
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> I think we should explicitly state that all
> > > > > > >>>> "listeners" are
> > > > > > >>>>>>>>> treated
> > > > > > >>>>>>>>>>> as
> > > > > > >>>>>>>>>>>>>> stateless. I don't see any strong reason for
> > > > > > >>>> snapshotting
> > > > > > >>>>>>> them.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>> D.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> On Sat, Mar 18, 2023 at 1:00 AM Shammon FY <
> > > > > > >>>>>>> zjur...@gmail.com>
> > > > > > >>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Hi Panagiotis
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Thank you for starting this discussion. I think
> > > > > > >>> this
> > > > > > >>>> FLIP
> > > > > > >>>>>>> is
> > > > > > >>>>>>>>>>> valuable
> > > > > > >>>>>>>>>>>>> and
> > > > > > >>>>>>>>>>>>>>> can help user to analyze the causes of job
> > > > > > >> failover
> > > > > > >>>>>> better!
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> I have two comments as follows
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> 1. How about adding more job information in
> > > > > > >>>>>>>>> FailureListenerContext?
> > > > > > >>>>>>>>>>>> For
> > > > > > >>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager
> > > > > > >>> location.
> > > > > > >>>> And
> > > > > > >>>>>>> then
> > > > > > >>>>>>>>> user
> > > > > > >>>>>>>>>>>> can
> > > > > > >>>>>>>>>>>>> do
> > > > > > >>>>>>>>>>>>>>> more statistics according to different
> > > > > > >> dimensions.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> 2. Users may want to save results in listener,
> > > > > > >> and
> > > > > > >>>> then
> > > > > > >>>>>>> they
> > > > > > >>>>>>>>> can
> > > > > > >>>>>>>>>>> get
> > > > > > >>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> historical results even jabmanager failover. Can
> > > > > > >> we
> > > > > > >>>>>>> provide a
> > > > > > >>>>>>>>>>> unified
> > > > > > >>>>>>>>>>>>>>> implementation for data storage requirements?
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>>>> shammon FY
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> On Saturday, March 18, 2023, Panagiotis
> > > > > > >>> Garefalakis <
> > > > > > >>>>>>>>>>>> pga...@apache.org
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Hi everyone,
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> This FLIP [1] proposes a pluggable interface
> > > > > > >> for
> > > > > > >>>>>> failure
> > > > > > >>>>>>>>> handling
> > > > > > >>>>>>>>>>>>>>> allowing
> > > > > > >>>>>>>>>>>>>>>> users to implement custom failure logic using
> > > > > > >> the
> > > > > > >>>>>> plugin
> > > > > > >>>>>>>>>>> framework.
> > > > > > >>>>>>>>>>>>>>>> Motivated by existing proposals [2] and tickets
> > > > > > >>>> [3],
> > > > > > >>>>>> this
> > > > > > >>>>>>>>> enables
> > > > > > >>>>>>>>>>>>>>> use-cases
> > > > > > >>>>>>>>>>>>>>>> like: assigning particular types to failures
> > > > > > >>> (e.g.,
> > > > > > >>>>>> User
> > > > > > >>>>>>> or
> > > > > > >>>>>>>>>>>> System),
> > > > > > >>>>>>>>>>>>>>>> emitting custom metrics per type (e.g.,
> > > > > > >>>> application or
> > > > > > >>>>>>>>> platform),
> > > > > > >>>>>>>>>>>>> even
> > > > > > >>>>>>>>>>>>>>>> exposing errors to downstream consumers (e.g.,
> > > > > > >>>>>>> notification
> > > > > > >>>>>>>>>>>> systems).
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Thanks to Piotr and Anton for the initial
> > > > > > >> reviews
> > > > > > >>>> and
> > > > > > >>>>>>>>>>> discussions!
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> For anyone interested, the starting point would
> > > > > > >>> be
> > > > > > >>>> the
> > > > > > >>>>>>> FLIP
> > > > > > >>>>>>>>> [1]
> > > > > > >>>>>>>>>>>> that
> > > > > > >>>>>>>>>>>>> I
> > > > > > >>>>>>>>>>>>>>>> created,
> > > > > > >>>>>>>>>>>>>>>> describing the motivation and the proposed
> > > > > > >>> changes
> > > > > > >>>>>> (part
> > > > > > >>>>>>> of
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>>> core,
> > > > > > >>>>>>>>>>>>>>>> runtime and web).
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> The intuition behind this FLIP is being able to
> > > > > > >>>> execute
> > > > > > >>>>>>>>> custom
> > > > > > >>>>>>>>>>>> logic
> > > > > > >>>>>>>>>>>>> on
> > > > > > >>>>>>>>>>>>>>>> failures by exposing a FailureListener
> > > > > > >> interface.
> > > > > > >>>>>>>>> Implementation
> > > > > > >>>>>>>>>>> by
> > > > > > >>>>>>>>>>>>>> users
> > > > > > >>>>>>>>>>>>>>>> can be simply loaded to the system as Jar
> > > > > > >> files.
> > > > > > >>>>>>>>> FailureListeners
> > > > > > >>>>>>>>>>>> may
> > > > > > >>>>>>>>>>>>>>> also
> > > > > > >>>>>>>>>>>>>>>> decide to assign failure tags to errors
> > > > > > >>> (expressed
> > > > > > >>>> as
> > > > > > >>>>>>>>> strings),
> > > > > > >>>>>>>>>>>>>>>> that will then be exposed as metadata by the
> > > > > > >>>> UI/Rest
> > > > > > >>>>>>>>> interfaces.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Feedback is always appreciated! Looking forward
> > > > > > >>> to
> > > > > > >>>> your
> > > > > > >>>>>>>>> thoughts!
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> [1]
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%
> > > > > > >>>>>>>>>>>>>>>> 3A+Pluggable+failure+handling+for+Apache+Flink
> > > > > > >>>>>>>>>>>>>>>> [2]
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>
> > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67
> > > > > > >>>>>>>>>>>>>>>> Hmjgy0-hRDeuFnrMgT4
> > > > > > >>>>>>>>>>>>>>>> [3]
> > > > > > >>>> https://issues.apache.org/jira/browse/FLINK-20833
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Cheers,
> > > > > > >>>>>>>>>>>>>>>> Panagiotis
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> >
> >

Reply via email to