Hi Panagiotis,

Thanks for the proposal.

It's useful to enrich the information so that users can be more
clear why the job is failing, especially platform developers who
need to provide the information to their end users.
And for the very FLIP, I'd prefer the naming `FailureEnricher`
proposed by David, as the plugin doesn't really handle the failure.

However, like Zhu and Lijie said, I also joined a discussion
recently about customized failure handling, e.g. counting the
failure rate of pipeline regions separately, and failing the job
when a specific error occurs, and so on.
I suppose a custom restart strategy, or I'd call it a custom
failure "handler", is indeed necessary. It can also enrich the
information as the current proposed handler does.

To avoid adding too many plugin interfaces which may confuse users
and make the ExecutionFailureHandler more complex,
I think it'd be better to consider the requirements at the same time.

IMO, we can add a handler interface, then make the current restart
strategy and the enricher both types of the handler. The handlers
execute in sequence, and the failure is considered unrecoverable if
any of the handlers decides.
In this way, users can also implement a handler using the enriched
information provided by the previous handlers, e.g. fail the job and
send a notification if too many failures are caused by the end users.

Best,
Gen


On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu <huweihua....@gmail.com> wrote:

> Hi Panagiotis,
>
> Thanks for your proposal. It is valuable to analyze the reason for
> failure with the user plug-in.
>
> Making the context immutable could make the contract stronger.
> Letting the listener return an enriching result may be a better way.
>
> IIUC, listeners could do two things, enrich more information (tags/labels)
> to FailureHandlingResult, and push data out of Flink (metrics or
> something).
>  IMO, we could split these two types into Listener and Advisor (maybe
> other names). The Listener just pushes the data out and returns nothing to
>  Flink, so we can run these async and don't have to wait for Listener's
> result.
>  The Advisor returns rich information to the FailureHadingResult, and it
> should
>  have a lighter logic.
>
>
> Supporting a custom restart strategy is also valuable. In this design, we
> use
> RestartStrategy to construct a FailureHandingResult, and then pass it to
> Listener.
> My question is, should we change the restart strategy interface to support
> the
> custom restart strategy, or keep the current restart strategy and let the
> later
> Listener enrich the restartable information to FailureHandingResult? The
> latter
> may cause some confusion when we use a custom restart strategy.
> The default flink restart strategy also runs but does not take effect.
>
>
> Best,
> Weihua
>
>
> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang <wangdachui9...@gmail.com>
> wrote:
>
> > Hi Panagiotis,
> >
> > Thanks for driving this.
> >
> > +1 for supporting custom restart strategy, we did receive such requests
> > from the user mailing list [1][2].
> >
> > Besides, in current design, the plugin will only do some statistical and
> > classification work, and will not affect the *FailureHandlingResult*.
> Just
> > listening, no handling, it doesn't quite match the title.
> >
> > [1] https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1
> > [2] https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx
> >
> > Best,
> > Lijie
> >
> > Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道:
> >
> > > Hi Panagiotis,
> > >
> > > Thanks for creating this proposal! It's good to enable Flink to handle
> > > different errors in different ways, through a pluggable way.
> > >
> > > There are requests for flexible restart strategies from time to time,
> for
> > > different strategies of restart backoff time, or to suppress restarting
> > > on certain errors. Therefore, I think it's better that the proposed
> > > failure handling plugin can also support custom restart strategies.
> > >
> > > Maybe we can call it FailureHandlingAdvisor which provides more
> > > information (labels) and gives advice (restart backoff time, whether
> > > to restart)? I do not have a strong opinion though, any explanatory
> > > name would be good.
> > >
> > > To avoid unexpected mutation, how about to make the context immutable
> > > and let the plugin return an immutable result? i.e. remove the setters
> > > from the context, and let the plugin method return a result which
> > > contains `labels`, `canRestart` and `restartBackoffTime`. Flink should
> > > apply the result to the context before invoking the next plugin, so
> > > that the next plugin will see the updated context.
> > >
> > > The plugin should avoid taking too much time to return the result,
> > because
> > > it will block the RPC and result in instability. However, it can still
> > > perform heavy actions in a different thread. The context can provide an
> > > `ioExecutor` to the plugins for reuse.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道:
> > > >
> > > > Hi Panagiotis
> > > >
> > > > Thank you for your answer. I agree that `FailureListener` could be
> > > > stateless, then I have some thoughts as follows
> > > >
> > > > 1. I see that listeners and tag collections are associated. When
> > > JobManager
> > > > fails and restarts, how can the new listener be associated with the
> tag
> > > > collection before failover? Is the listener loading order?
> > > >
> > > > 2. The tag collection may be too large, resulting in the JobManager
> > OOM,
> > > do
> > > > we need to provide a management class that supports some obsolescence
> > > > strategies instead of a direct Collection?
> > > >
> > > > 3. Is it possible to provide a more complex data structure than a
> > simple
> > > > string collection for tags in listeners, such as key-value?
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > >
> > > > On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu <xbjt...@gmail.com>
> wrote:
> > > >
> > > > > Hi,Panagiotis
> > > > >
> > > > >
> > > > > Thank you for kicking off this discussion. Overall, the proposed
> > > feature of
> > > > > this FLIP makes sense to me. We have also discussed similar
> > > requirements
> > > > > with our users and developers, and I believe it will help many
> users.
> > > > >
> > > > >
> > > > > In terms of FLIP content, I have some thoughts:
> > > > >
> > > > > (1) For the FailureListenerContextget interface, the methods
> > > > > FailureListenerContext#addTag and FailureListenerContextgetTags
> looks
> > > very
> > > > > inconsistent because they imply specific implementation details,
> and
> > > not
> > > > > all FailureListeners need to handle them, we shouldn't put them in
> > the
> > > > > interface. Minor: The comment "UDF loading" in the
> > getUserClassLoader()
> > > > > method looks like a typo, IIUC it should return the classloader of
> > the
> > > > > current job.
> > > > >
> > > > > (2) Regarding the implementation in
> > > ExecutionFailureHandler#handleFailure,
> > > > > some custom listeners may have heavy IO operations, such as
> reporting
> > > to
> > > > > their monitoring system. The current logic appears to be processing
> > in
> > > the
> > > > > JobMaster's main thread, and it is recommended not to do this kind
> of
> > > > > processing in the main thread.
> > > > >
> > > > > (3) The results of FailureListener's processing and the
> > > > > FailureHandlingResult returned by ExecutionFailureHandler are not
> > > related.
> > > > > I think these two are closely related, the motivation of this FLIP
> is
> > > to
> > > > > make current failure handling more flexible. From this perspective,
> > > > > different listeners should have the opportunity to affect the job's
> > > failure
> > > > > handling flow. For example, a Flink job is configured with a
> > > > > RestartStrategy with huge numbers retry , but the Kafka topic of
> > > Source has
> > > > > been deleted, the job will failover continuously. In this case, the
> > > user
> > > > > should have their listener to determine whether this failure is
> > > recoverable
> > > > > or unrecoverable, and then wrap the processing result into
> > > > > FailureHandlingResult.unrecoverable(xx) and pass it to JobMaster,
> > this
> > > > > approach will be more flexible.
> > > > >
> > > > > (4) All FLIPs have an important section named Public Interfaces.
> > > Current
> > > > > FLIP mixes the interface section and the implementation section
> > > together.
> > > > > It is better for us to refer to the FLIP template[1] and separate
> > them,
> > > > > this will make the entire FLIP clearer.
> > > > >
> > > > > In addition, regarding the FLIP process, there is a small
> suggestion:
> > > The
> > > > > community generally creates a JIRA issue after the FLIP vote is
> > passed,
> > > > > instead of during the FLIP preparation phase because the FLIP may
> be
> > > > > rejected. Although this FLIP is very reasonable, it's better to
> > follow
> > > the
> > > > > process.
> > > > >
> > > > > Best,
> > > > >
> > > > > Leonard
> > > > >
> > > > > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> > > > >
> > > > > On Mon, Mar 20, 2023 at 7:04 PM David Morávek <d...@apache.org>
> > wrote:
> > > > >
> > > > > > >
> > > > > > > however listeners can use previous state (tags/labels) to make
> > > > > decisions
> > > > > >
> > > > > >
> > > > > > That sounds like a very fragile contract. We should either allow
> > > passing
> > > > > > tags between listeners and then need to define ordering or make
> all
> > > of
> > > > > them
> > > > > > independent. I prefer the latter because it allows us to
> > parallelize
> > > > > things
> > > > > > if needed (if all listeners trigger an RCP to the external
> system,
> > > for
> > > > > > example).
> > > > > >
> > > > > > Can you expand on why we need more than one classifier to be able
> > to
> > > > > output
> > > > > > the same tag?
> > > > > >
> > > > > > system ones come first and then the ones loaded from the plugin
> > > manager
> > > > > >
> > > > > >
> > > > > > Since they're returned as a Set, the order is completely
> > > > > non-deterministic,
> > > > > > no matter in which order they're loaded.
> > > > > >
> > > > > > just communicating with external monitoring/alerting systems
> > > > > > >
> > > > > >
> > > > > > That makes the need for pushing things out of the main thread
> even
> > > > > > stronger. This almost sounds like we need to return a
> > > CompletableFuture
> > > > > for
> > > > > > the per-throwable classification because an external system might
> > > take a
> > > > > > significant time to respond. We need to unblock the main thread
> for
> > > other
> > > > > > RPCs.
> > > > > >
> > > > > > Also, in the proposal, this happens in the failure handler. If
> > > that's the
> > > > > > case, this might block the job from being restarted (if the
> restart
> > > > > > strategy allows for another restart), which would be great to
> avoid
> > > > > because
> > > > > > it can introduce extra downtime.
> > > > > >
> > > > > > This raises another question: what should happen if the
> > > classification
> > > > > > fails? Crashing the job (which is what's currently proposed)
> seems
> > > very
> > > > > > dangerous if this might depend on an external system.
> > > > > >
> > > > > > Thats a valid point, passing the JobGraph containing all the
> above
> > > > > > > information is also something to consider
> > > > > > >
> > > > > >
> > > > > > We should avoid passing JG around because it's mutable (which we
> > > must fix
> > > > > > in the long term), and letting users change it might have
> > > consequences.
> > > > > >
> > > > > > Best,
> > > > > > D.
> > > > > >
> > > > > > On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis <
> > > > > pga...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey David, Shammon,
> > > > > > >
> > > > > > > Thanks for the valuable comments!
> > > > > > > I am glad you find this proposal useful, some thoughts:
> > > > > > >
> > > > > > > @Shammon
> > > > > > >
> > > > > > > 1. How about adding more job information in
> > > FailureListenerContext? For
> > > > > > > > example, job vertext, subtask, taskmanager location. And then
> > > user
> > > > > can
> > > > > > do
> > > > > > > > more statistics according to different dimensions.
> > > > > > >
> > > > > > >
> > > > > > > Thats a valid point, passing the JobGraph containing all the
> > above
> > > > > > > information
> > > > > > > is also something to consider, I was mostly trying to be
> > > conservative:
> > > > > > > i.e., passingly only the information we need, and extend as we
> > see
> > > fit
> > > > > > >
> > > > > > > 2. Users may want to save results in listener, and then they
> can
> > > get
> > > > > the
> > > > > > > > historical results even jabmanager failover. Can we provide a
> > > unified
> > > > > > > > implementation for data storage requirements?
> > > > > > >
> > > > > > >
> > > > > > > The idea is to store only the output of the Listeners (tags)
> and
> > > treat
> > > > > > them
> > > > > > > as stateless.
> > > > > > > Tags are be stored along with HistoryEntries, and will be
> > available
> > > > > > through
> > > > > > > the HistoryServer
> > > > > > > even after a JM dies.
> > > > > > >
> > > > > > > @David
> > > > > > >
> > > > > > > 1) Should we also consider adding labels? The combination of
> tags
> > > and
> > > > > > > > labels seems to be what most systems offer; sometimes, they
> > offer
> > > > > > labels
> > > > > > > > only (key=value pairs) because tags can be implemented using
> > > those,
> > > > > but
> > > > > > > not
> > > > > > > > the other way around.
> > > > > > >
> > > > > > >
> > > > > > > Indeed changing tags to k:v labels could be more expressive, I
> > > like it!
> > > > > > > Let's see what others think.
> > > > > > >
> > > > > > > 2) Since we can not predict how heavy user-defined models
> > > ("listeners")
> > > > > > are
> > > > > > > > going to be, it would be great to keep the interfaces/data
> > > structures
> > > > > > > > immutable so we can push things over to the I/O threads.
> Also,
> > it
> > > > > > sounds
> > > > > > > > off to call the main interface a Listener since it's supposed
> > to
> > > > > > enhance
> > > > > > > > the original throwable with additional metadata.
> > > > > > >
> > > > > > >
> > > > > > > The idea was for the name to be generic as there could be
> > Listener
> > > > > > > implementations
> > > > > > > just communicating with external monitoring/alerting systems
> and
> > no
> > > > > > > metadata output
> > > > > > > -- but lets rethink that. For immutability, see below:
> > > > > > >
> > > > > > > 3) You're proposing to support a set of listeners. Since you're
> > > passing
> > > > > > the
> > > > > > > > mutable context around, which includes tags set by the
> previous
> > > > > > listener,
> > > > > > > > do you expect users to make any assumptions about the order
> in
> > > which
> > > > > > > > listeners are executed?
> > > > > > >
> > > > > > >
> > > > > > > In the existing proposal we are not making any assumptions
> about
> > > the
> > > > > > order
> > > > > > > of listeners,
> > > > > > > (system ones come first and then the ones loaded from the
> plugin
> > > > > manager)
> > > > > > > however listeners can use previous state (tags/labels) to make
> > > > > decisions:
> > > > > > > e.g., wont assign *UNKNOWN* failureType when we have already
> seen
> > > *USER
> > > > > > *or
> > > > > > > the other way around -- when we have seen *UNKNOWN* remove in
> > > favor of
> > > > > > > *USER*
> > > > > > >
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Panagiotis
> > > > > > >
> > > > > > > On Sun, Mar 19, 2023 at 10:42 AM David Morávek <
> d...@apache.org>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Panagiotis,
> > > > > > > >
> > > > > > > > This is an excellent proposal and something everyone trying
> to
> > > > > provide
> > > > > > > > "Flink as a service" needs to solve at some point. I have a
> > > couple of
> > > > > > > > questions:
> > > > > > > >
> > > > > > > > If I understand the proposal correctly, this is just about
> > adding
> > > > > tags
> > > > > > to
> > > > > > > > the Throwable by running a tuple of (Throwable,
> FailureContext)
> > > > > > through a
> > > > > > > > user-defined model.
> > > > > > > >
> > > > > > > > 1) Should we also consider adding labels? The combination of
> > > tags and
> > > > > > > > labels seems to be what most systems offer; sometimes, they
> > offer
> > > > > > labels
> > > > > > > > only (key=value pairs) because tags can be implemented using
> > > those,
> > > > > but
> > > > > > > not
> > > > > > > > the other way around.
> > > > > > > >
> > > > > > > > 2) Since we can not predict how heavy user-defined models
> > > > > ("listeners")
> > > > > > > are
> > > > > > > > going to be, it would be great to keep the interfaces/data
> > > structures
> > > > > > > > immutable so we can push things over to the I/O threads.
> Also,
> > it
> > > > > > sounds
> > > > > > > > off to call the main interface a Listener since it's supposed
> > to
> > > > > > enhance
> > > > > > > > the original throwable with additional metadata.
> > > > > > > >
> > > > > > > > I'd propose something along the lines of (we should have
> better
> > > > > names,
> > > > > > > this
> > > > > > > > is just to outline the idea):
> > > > > > > >
> > > > > > > > interface FailureEnricher {
> > > > > > > >
> > > > > > > >   ThrowableWithTagsAndLabels enrichFailure(Throwable cause,
> > > > > > > > ImmutableContextualMetadataAboutTheThrowable context);
> > > > > > > > }
> > > > > > > >
> > > > > > > > The names should change; this is just to outline the idea.
> > > > > > > >
> > > > > > > > 3) You're proposing to support a set of listeners. Since
> you're
> > > > > passing
> > > > > > > the
> > > > > > > > mutable context around, which includes tags set by the
> previous
> > > > > > listener,
> > > > > > > > do you expect users to make any assumptions about the order
> in
> > > which
> > > > > > > > listeners are executed?
> > > > > > > >
> > > > > > > > *@Shammon*
> > > > > > > >
> > > > > > > > Users may want to save results in listener, and then they can
> > > get the
> > > > > > > > > historical results even jabmanager failover. Can we
> provide a
> > > > > unified
> > > > > > > > > implementation for data storage requirements?
> > > > > > > >
> > > > > > > >
> > > > > > > > I think we should explicitly state that all "listeners" are
> > > treated
> > > > > as
> > > > > > > > stateless. I don't see any strong reason for snapshotting
> them.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > D.
> > > > > > > >
> > > > > > > > On Sat, Mar 18, 2023 at 1:00 AM Shammon FY <
> zjur...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Panagiotis
> > > > > > > > >
> > > > > > > > > Thank you for starting this discussion. I think this FLIP
> is
> > > > > valuable
> > > > > > > and
> > > > > > > > > can help user to analyze the causes of job failover better!
> > > > > > > > >
> > > > > > > > > I have two comments as follows
> > > > > > > > >
> > > > > > > > > 1. How about adding more job information in
> > > FailureListenerContext?
> > > > > > For
> > > > > > > > > example, job vertext, subtask, taskmanager location. And
> then
> > > user
> > > > > > can
> > > > > > > do
> > > > > > > > > more statistics according to different dimensions.
> > > > > > > > >
> > > > > > > > > 2. Users may want to save results in listener, and then
> they
> > > can
> > > > > get
> > > > > > > the
> > > > > > > > > historical results even jabmanager failover. Can we
> provide a
> > > > > unified
> > > > > > > > > implementation for data storage requirements?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > shammon FY
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Saturday, March 18, 2023, Panagiotis Garefalakis <
> > > > > > pga...@apache.org
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > This FLIP [1] proposes a pluggable interface for failure
> > > handling
> > > > > > > > > allowing
> > > > > > > > > > users to implement custom failure logic using the plugin
> > > > > framework.
> > > > > > > > > > Motivated by existing proposals [2] and tickets [3], this
> > > enables
> > > > > > > > > use-cases
> > > > > > > > > > like: assigning particular types to failures (e.g., User
> or
> > > > > > System),
> > > > > > > > > > emitting custom metrics per type (e.g., application or
> > > platform),
> > > > > > > even
> > > > > > > > > > exposing errors to downstream consumers (e.g.,
> notification
> > > > > > systems).
> > > > > > > > > >
> > > > > > > > > > Thanks to Piotr and Anton for the initial reviews and
> > > > > discussions!
> > > > > > > > > >
> > > > > > > > > > For anyone interested, the starting point would be the
> FLIP
> > > [1]
> > > > > > that
> > > > > > > I
> > > > > > > > > > created,
> > > > > > > > > > describing the motivation and the proposed changes (part
> of
> > > the
> > > > > > core,
> > > > > > > > > > runtime and web).
> > > > > > > > > >
> > > > > > > > > > The intuition behind this FLIP is being able to execute
> > > custom
> > > > > > logic
> > > > > > > on
> > > > > > > > > > failures by exposing a FailureListener interface.
> > > Implementation
> > > > > by
> > > > > > > > users
> > > > > > > > > > can be simply loaded to the system as Jar files.
> > > FailureListeners
> > > > > > may
> > > > > > > > > also
> > > > > > > > > > decide to assign failure tags to errors (expressed as
> > > strings),
> > > > > > > > > > that will then be exposed as metadata by the UI/Rest
> > > interfaces.
> > > > > > > > > >
> > > > > > > > > > Feedback is always appreciated! Looking forward to your
> > > thoughts!
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%
> > > > > > > > > > 3A+Pluggable+failure+handling+for+Apache+Flink
> > > > > > > > > > [2]
> > > > > > > > > >
> > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67
> > > > > > > > > > Hmjgy0-hRDeuFnrMgT4
> > > > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-20833
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Panagiotis
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
>

Reply via email to