Hi Panagiotis,

Thanks for driving this.

+1 for supporting custom restart strategy, we did receive such requests
from the user mailing list [1][2].

Besides, in current design, the plugin will only do some statistical and
classification work, and will not affect the *FailureHandlingResult*. Just
listening, no handling, it doesn't quite match the title.

[1] https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1
[2] https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx

Best,
Lijie

Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道:

> Hi Panagiotis,
>
> Thanks for creating this proposal! It's good to enable Flink to handle
> different errors in different ways, through a pluggable way.
>
> There are requests for flexible restart strategies from time to time, for
> different strategies of restart backoff time, or to suppress restarting
> on certain errors. Therefore, I think it's better that the proposed
> failure handling plugin can also support custom restart strategies.
>
> Maybe we can call it FailureHandlingAdvisor which provides more
> information (labels) and gives advice (restart backoff time, whether
> to restart)? I do not have a strong opinion though, any explanatory
> name would be good.
>
> To avoid unexpected mutation, how about to make the context immutable
> and let the plugin return an immutable result? i.e. remove the setters
> from the context, and let the plugin method return a result which
> contains `labels`, `canRestart` and `restartBackoffTime`. Flink should
> apply the result to the context before invoking the next plugin, so
> that the next plugin will see the updated context.
>
> The plugin should avoid taking too much time to return the result, because
> it will block the RPC and result in instability. However, it can still
> perform heavy actions in a different thread. The context can provide an
> `ioExecutor` to the plugins for reuse.
>
> Thanks,
> Zhu
>
> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道:
> >
> > Hi Panagiotis
> >
> > Thank you for your answer. I agree that `FailureListener` could be
> > stateless, then I have some thoughts as follows
> >
> > 1. I see that listeners and tag collections are associated. When
> JobManager
> > fails and restarts, how can the new listener be associated with the tag
> > collection before failover? Is the listener loading order?
> >
> > 2. The tag collection may be too large, resulting in the JobManager OOM,
> do
> > we need to provide a management class that supports some obsolescence
> > strategies instead of a direct Collection?
> >
> > 3. Is it possible to provide a more complex data structure than a simple
> > string collection for tags in listeners, such as key-value?
> >
> > Best,
> > Shammon FY
> >
> >
> > On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu <xbjt...@gmail.com> wrote:
> >
> > > Hi,Panagiotis
> > >
> > >
> > > Thank you for kicking off this discussion. Overall, the proposed
> feature of
> > > this FLIP makes sense to me. We have also discussed similar
> requirements
> > > with our users and developers, and I believe it will help many users.
> > >
> > >
> > > In terms of FLIP content, I have some thoughts:
> > >
> > > (1) For the FailureListenerContextget interface, the methods
> > > FailureListenerContext#addTag and FailureListenerContextgetTags looks
> very
> > > inconsistent because they imply specific implementation details, and
> not
> > > all FailureListeners need to handle them, we shouldn't put them in the
> > > interface. Minor: The comment "UDF loading" in the getUserClassLoader()
> > > method looks like a typo, IIUC it should return the classloader of the
> > > current job.
> > >
> > > (2) Regarding the implementation in
> ExecutionFailureHandler#handleFailure,
> > > some custom listeners may have heavy IO operations, such as reporting
> to
> > > their monitoring system. The current logic appears to be processing in
> the
> > > JobMaster's main thread, and it is recommended not to do this kind of
> > > processing in the main thread.
> > >
> > > (3) The results of FailureListener's processing and the
> > > FailureHandlingResult returned by ExecutionFailureHandler are not
> related.
> > > I think these two are closely related, the motivation of this FLIP is
> to
> > > make current failure handling more flexible. From this perspective,
> > > different listeners should have the opportunity to affect the job's
> failure
> > > handling flow. For example, a Flink job is configured with a
> > > RestartStrategy with huge numbers retry , but the Kafka topic of
> Source has
> > > been deleted, the job will failover continuously. In this case, the
> user
> > > should have their listener to determine whether this failure is
> recoverable
> > > or unrecoverable, and then wrap the processing result into
> > > FailureHandlingResult.unrecoverable(xx) and pass it to JobMaster, this
> > > approach will be more flexible.
> > >
> > > (4) All FLIPs have an important section named Public Interfaces.
> Current
> > > FLIP mixes the interface section and the implementation section
> together.
> > > It is better for us to refer to the FLIP template[1] and separate them,
> > > this will make the entire FLIP clearer.
> > >
> > > In addition, regarding the FLIP process, there is a small suggestion:
> The
> > > community generally creates a JIRA issue after the FLIP vote is passed,
> > > instead of during the FLIP preparation phase because the FLIP may be
> > > rejected. Although this FLIP is very reasonable, it's better to follow
> the
> > > process.
> > >
> > > Best,
> > >
> > > Leonard
> > >
> > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> > >
> > > On Mon, Mar 20, 2023 at 7:04 PM David Morávek <d...@apache.org> wrote:
> > >
> > > > >
> > > > > however listeners can use previous state (tags/labels) to make
> > > decisions
> > > >
> > > >
> > > > That sounds like a very fragile contract. We should either allow
> passing
> > > > tags between listeners and then need to define ordering or make all
> of
> > > them
> > > > independent. I prefer the latter because it allows us to parallelize
> > > things
> > > > if needed (if all listeners trigger an RCP to the external system,
> for
> > > > example).
> > > >
> > > > Can you expand on why we need more than one classifier to be able to
> > > output
> > > > the same tag?
> > > >
> > > > system ones come first and then the ones loaded from the plugin
> manager
> > > >
> > > >
> > > > Since they're returned as a Set, the order is completely
> > > non-deterministic,
> > > > no matter in which order they're loaded.
> > > >
> > > > just communicating with external monitoring/alerting systems
> > > > >
> > > >
> > > > That makes the need for pushing things out of the main thread even
> > > > stronger. This almost sounds like we need to return a
> CompletableFuture
> > > for
> > > > the per-throwable classification because an external system might
> take a
> > > > significant time to respond. We need to unblock the main thread for
> other
> > > > RPCs.
> > > >
> > > > Also, in the proposal, this happens in the failure handler. If
> that's the
> > > > case, this might block the job from being restarted (if the restart
> > > > strategy allows for another restart), which would be great to avoid
> > > because
> > > > it can introduce extra downtime.
> > > >
> > > > This raises another question: what should happen if the
> classification
> > > > fails? Crashing the job (which is what's currently proposed) seems
> very
> > > > dangerous if this might depend on an external system.
> > > >
> > > > Thats a valid point, passing the JobGraph containing all the above
> > > > > information is also something to consider
> > > > >
> > > >
> > > > We should avoid passing JG around because it's mutable (which we
> must fix
> > > > in the long term), and letting users change it might have
> consequences.
> > > >
> > > > Best,
> > > > D.
> > > >
> > > > On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis <
> > > pga...@apache.org>
> > > > wrote:
> > > >
> > > > > Hey David, Shammon,
> > > > >
> > > > > Thanks for the valuable comments!
> > > > > I am glad you find this proposal useful, some thoughts:
> > > > >
> > > > > @Shammon
> > > > >
> > > > > 1. How about adding more job information in
> FailureListenerContext? For
> > > > > > example, job vertext, subtask, taskmanager location. And then
> user
> > > can
> > > > do
> > > > > > more statistics according to different dimensions.
> > > > >
> > > > >
> > > > > Thats a valid point, passing the JobGraph containing all the above
> > > > > information
> > > > > is also something to consider, I was mostly trying to be
> conservative:
> > > > > i.e., passingly only the information we need, and extend as we see
> fit
> > > > >
> > > > > 2. Users may want to save results in listener, and then they can
> get
> > > the
> > > > > > historical results even jabmanager failover. Can we provide a
> unified
> > > > > > implementation for data storage requirements?
> > > > >
> > > > >
> > > > > The idea is to store only the output of the Listeners (tags) and
> treat
> > > > them
> > > > > as stateless.
> > > > > Tags are be stored along with HistoryEntries, and will be available
> > > > through
> > > > > the HistoryServer
> > > > > even after a JM dies.
> > > > >
> > > > > @David
> > > > >
> > > > > 1) Should we also consider adding labels? The combination of tags
> and
> > > > > > labels seems to be what most systems offer; sometimes, they offer
> > > > labels
> > > > > > only (key=value pairs) because tags can be implemented using
> those,
> > > but
> > > > > not
> > > > > > the other way around.
> > > > >
> > > > >
> > > > > Indeed changing tags to k:v labels could be more expressive, I
> like it!
> > > > > Let's see what others think.
> > > > >
> > > > > 2) Since we can not predict how heavy user-defined models
> ("listeners")
> > > > are
> > > > > > going to be, it would be great to keep the interfaces/data
> structures
> > > > > > immutable so we can push things over to the I/O threads. Also, it
> > > > sounds
> > > > > > off to call the main interface a Listener since it's supposed to
> > > > enhance
> > > > > > the original throwable with additional metadata.
> > > > >
> > > > >
> > > > > The idea was for the name to be generic as there could be Listener
> > > > > implementations
> > > > > just communicating with external monitoring/alerting systems and no
> > > > > metadata output
> > > > > -- but lets rethink that. For immutability, see below:
> > > > >
> > > > > 3) You're proposing to support a set of listeners. Since you're
> passing
> > > > the
> > > > > > mutable context around, which includes tags set by the previous
> > > > listener,
> > > > > > do you expect users to make any assumptions about the order in
> which
> > > > > > listeners are executed?
> > > > >
> > > > >
> > > > > In the existing proposal we are not making any assumptions about
> the
> > > > order
> > > > > of listeners,
> > > > > (system ones come first and then the ones loaded from the plugin
> > > manager)
> > > > > however listeners can use previous state (tags/labels) to make
> > > decisions:
> > > > > e.g., wont assign *UNKNOWN* failureType when we have already seen
> *USER
> > > > *or
> > > > > the other way around -- when we have seen *UNKNOWN* remove in
> favor of
> > > > > *USER*
> > > > >
> > > > >
> > > > > Cheers,
> > > > > Panagiotis
> > > > >
> > > > > On Sun, Mar 19, 2023 at 10:42 AM David Morávek <d...@apache.org>
> > > wrote:
> > > > >
> > > > > > Hi Panagiotis,
> > > > > >
> > > > > > This is an excellent proposal and something everyone trying to
> > > provide
> > > > > > "Flink as a service" needs to solve at some point. I have a
> couple of
> > > > > > questions:
> > > > > >
> > > > > > If I understand the proposal correctly, this is just about adding
> > > tags
> > > > to
> > > > > > the Throwable by running a tuple of (Throwable, FailureContext)
> > > > through a
> > > > > > user-defined model.
> > > > > >
> > > > > > 1) Should we also consider adding labels? The combination of
> tags and
> > > > > > labels seems to be what most systems offer; sometimes, they offer
> > > > labels
> > > > > > only (key=value pairs) because tags can be implemented using
> those,
> > > but
> > > > > not
> > > > > > the other way around.
> > > > > >
> > > > > > 2) Since we can not predict how heavy user-defined models
> > > ("listeners")
> > > > > are
> > > > > > going to be, it would be great to keep the interfaces/data
> structures
> > > > > > immutable so we can push things over to the I/O threads. Also, it
> > > > sounds
> > > > > > off to call the main interface a Listener since it's supposed to
> > > > enhance
> > > > > > the original throwable with additional metadata.
> > > > > >
> > > > > > I'd propose something along the lines of (we should have better
> > > names,
> > > > > this
> > > > > > is just to outline the idea):
> > > > > >
> > > > > > interface FailureEnricher {
> > > > > >
> > > > > >   ThrowableWithTagsAndLabels enrichFailure(Throwable cause,
> > > > > > ImmutableContextualMetadataAboutTheThrowable context);
> > > > > > }
> > > > > >
> > > > > > The names should change; this is just to outline the idea.
> > > > > >
> > > > > > 3) You're proposing to support a set of listeners. Since you're
> > > passing
> > > > > the
> > > > > > mutable context around, which includes tags set by the previous
> > > > listener,
> > > > > > do you expect users to make any assumptions about the order in
> which
> > > > > > listeners are executed?
> > > > > >
> > > > > > *@Shammon*
> > > > > >
> > > > > > Users may want to save results in listener, and then they can
> get the
> > > > > > > historical results even jabmanager failover. Can we provide a
> > > unified
> > > > > > > implementation for data storage requirements?
> > > > > >
> > > > > >
> > > > > > I think we should explicitly state that all "listeners" are
> treated
> > > as
> > > > > > stateless. I don't see any strong reason for snapshotting them.
> > > > > >
> > > > > > Best,
> > > > > > D.
> > > > > >
> > > > > > On Sat, Mar 18, 2023 at 1:00 AM Shammon FY <zjur...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Panagiotis
> > > > > > >
> > > > > > > Thank you for starting this discussion. I think this FLIP is
> > > valuable
> > > > > and
> > > > > > > can help user to analyze the causes of job failover better!
> > > > > > >
> > > > > > > I have two comments as follows
> > > > > > >
> > > > > > > 1. How about adding more job information in
> FailureListenerContext?
> > > > For
> > > > > > > example, job vertext, subtask, taskmanager location. And then
> user
> > > > can
> > > > > do
> > > > > > > more statistics according to different dimensions.
> > > > > > >
> > > > > > > 2. Users may want to save results in listener, and then they
> can
> > > get
> > > > > the
> > > > > > > historical results even jabmanager failover. Can we provide a
> > > unified
> > > > > > > implementation for data storage requirements?
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > shammon FY
> > > > > > >
> > > > > > >
> > > > > > > On Saturday, March 18, 2023, Panagiotis Garefalakis <
> > > > pga...@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > This FLIP [1] proposes a pluggable interface for failure
> handling
> > > > > > > allowing
> > > > > > > > users to implement custom failure logic using the plugin
> > > framework.
> > > > > > > > Motivated by existing proposals [2] and tickets [3], this
> enables
> > > > > > > use-cases
> > > > > > > > like: assigning particular types to failures (e.g., User or
> > > > System),
> > > > > > > > emitting custom metrics per type (e.g., application or
> platform),
> > > > > even
> > > > > > > > exposing errors to downstream consumers (e.g., notification
> > > > systems).
> > > > > > > >
> > > > > > > > Thanks to Piotr and Anton for the initial reviews and
> > > discussions!
> > > > > > > >
> > > > > > > > For anyone interested, the starting point would be the FLIP
> [1]
> > > > that
> > > > > I
> > > > > > > > created,
> > > > > > > > describing the motivation and the proposed changes (part of
> the
> > > > core,
> > > > > > > > runtime and web).
> > > > > > > >
> > > > > > > > The intuition behind this FLIP is being able to execute
> custom
> > > > logic
> > > > > on
> > > > > > > > failures by exposing a FailureListener interface.
> Implementation
> > > by
> > > > > > users
> > > > > > > > can be simply loaded to the system as Jar files.
> FailureListeners
> > > > may
> > > > > > > also
> > > > > > > > decide to assign failure tags to errors (expressed as
> strings),
> > > > > > > > that will then be exposed as metadata by the UI/Rest
> interfaces.
> > > > > > > >
> > > > > > > > Feedback is always appreciated! Looking forward to your
> thoughts!
> > > > > > > >
> > > > > > > > [1]
> > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%
> > > > > > > > 3A+Pluggable+failure+handling+for+Apache+Flink
> > > > > > > > [2]
> > > > > > > > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67
> > > > > > > > Hmjgy0-hRDeuFnrMgT4
> > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-20833
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Panagiotis
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Reply via email to