Hi Panagiotis

Thank you for your answer. I agree that `FailureListener` could be
stateless, then I have some thoughts as follows

1. I see that listeners and tag collections are associated. When JobManager
fails and restarts, how can the new listener be associated with the tag
collection before failover? Is the listener loading order?

2. The tag collection may be too large, resulting in the JobManager OOM, do
we need to provide a management class that supports some obsolescence
strategies instead of a direct Collection?

3. Is it possible to provide a more complex data structure than a simple
string collection for tags in listeners, such as key-value?

Best,
Shammon FY


On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu <xbjt...@gmail.com> wrote:

> Hi,Panagiotis
>
>
> Thank you for kicking off this discussion. Overall, the proposed feature of
> this FLIP makes sense to me. We have also discussed similar requirements
> with our users and developers, and I believe it will help many users.
>
>
> In terms of FLIP content, I have some thoughts:
>
> (1) For the FailureListenerContextget interface, the methods
> FailureListenerContext#addTag and FailureListenerContextgetTags looks very
> inconsistent because they imply specific implementation details, and not
> all FailureListeners need to handle them, we shouldn't put them in the
> interface. Minor: The comment "UDF loading" in the getUserClassLoader()
> method looks like a typo, IIUC it should return the classloader of the
> current job.
>
> (2) Regarding the implementation in ExecutionFailureHandler#handleFailure,
> some custom listeners may have heavy IO operations, such as reporting to
> their monitoring system. The current logic appears to be processing in the
> JobMaster's main thread, and it is recommended not to do this kind of
> processing in the main thread.
>
> (3) The results of FailureListener's processing and the
> FailureHandlingResult returned by ExecutionFailureHandler are not related.
> I think these two are closely related, the motivation of this FLIP is to
> make current failure handling more flexible. From this perspective,
> different listeners should have the opportunity to affect the job's failure
> handling flow. For example, a Flink job is configured with a
> RestartStrategy with huge numbers retry , but the Kafka topic of Source has
> been deleted, the job will failover continuously. In this case, the user
> should have their listener to determine whether this failure is recoverable
> or unrecoverable, and then wrap the processing result into
> FailureHandlingResult.unrecoverable(xx) and pass it to JobMaster, this
> approach will be more flexible.
>
> (4) All FLIPs have an important section named Public Interfaces. Current
> FLIP mixes the interface section and the implementation section together.
> It is better for us to refer to the FLIP template[1] and separate them,
> this will make the entire FLIP clearer.
>
> In addition, regarding the FLIP process, there is a small suggestion: The
> community generally creates a JIRA issue after the FLIP vote is passed,
> instead of during the FLIP preparation phase because the FLIP may be
> rejected. Although this FLIP is very reasonable, it's better to follow the
> process.
>
> Best,
>
> Leonard
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>
> On Mon, Mar 20, 2023 at 7:04 PM David Morávek <d...@apache.org> wrote:
>
> > >
> > > however listeners can use previous state (tags/labels) to make
> decisions
> >
> >
> > That sounds like a very fragile contract. We should either allow passing
> > tags between listeners and then need to define ordering or make all of
> them
> > independent. I prefer the latter because it allows us to parallelize
> things
> > if needed (if all listeners trigger an RCP to the external system, for
> > example).
> >
> > Can you expand on why we need more than one classifier to be able to
> output
> > the same tag?
> >
> > system ones come first and then the ones loaded from the plugin manager
> >
> >
> > Since they're returned as a Set, the order is completely
> non-deterministic,
> > no matter in which order they're loaded.
> >
> > just communicating with external monitoring/alerting systems
> > >
> >
> > That makes the need for pushing things out of the main thread even
> > stronger. This almost sounds like we need to return a CompletableFuture
> for
> > the per-throwable classification because an external system might take a
> > significant time to respond. We need to unblock the main thread for other
> > RPCs.
> >
> > Also, in the proposal, this happens in the failure handler. If that's the
> > case, this might block the job from being restarted (if the restart
> > strategy allows for another restart), which would be great to avoid
> because
> > it can introduce extra downtime.
> >
> > This raises another question: what should happen if the classification
> > fails? Crashing the job (which is what's currently proposed) seems very
> > dangerous if this might depend on an external system.
> >
> > Thats a valid point, passing the JobGraph containing all the above
> > > information is also something to consider
> > >
> >
> > We should avoid passing JG around because it's mutable (which we must fix
> > in the long term), and letting users change it might have consequences.
> >
> > Best,
> > D.
> >
> > On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis <
> pga...@apache.org>
> > wrote:
> >
> > > Hey David, Shammon,
> > >
> > > Thanks for the valuable comments!
> > > I am glad you find this proposal useful, some thoughts:
> > >
> > > @Shammon
> > >
> > > 1. How about adding more job information in FailureListenerContext? For
> > > > example, job vertext, subtask, taskmanager location. And then user
> can
> > do
> > > > more statistics according to different dimensions.
> > >
> > >
> > > Thats a valid point, passing the JobGraph containing all the above
> > > information
> > > is also something to consider, I was mostly trying to be conservative:
> > > i.e., passingly only the information we need, and extend as we see fit
> > >
> > > 2. Users may want to save results in listener, and then they can get
> the
> > > > historical results even jabmanager failover. Can we provide a unified
> > > > implementation for data storage requirements?
> > >
> > >
> > > The idea is to store only the output of the Listeners (tags) and treat
> > them
> > > as stateless.
> > > Tags are be stored along with HistoryEntries, and will be available
> > through
> > > the HistoryServer
> > > even after a JM dies.
> > >
> > > @David
> > >
> > > 1) Should we also consider adding labels? The combination of tags and
> > > > labels seems to be what most systems offer; sometimes, they offer
> > labels
> > > > only (key=value pairs) because tags can be implemented using those,
> but
> > > not
> > > > the other way around.
> > >
> > >
> > > Indeed changing tags to k:v labels could be more expressive, I like it!
> > > Let's see what others think.
> > >
> > > 2) Since we can not predict how heavy user-defined models ("listeners")
> > are
> > > > going to be, it would be great to keep the interfaces/data structures
> > > > immutable so we can push things over to the I/O threads. Also, it
> > sounds
> > > > off to call the main interface a Listener since it's supposed to
> > enhance
> > > > the original throwable with additional metadata.
> > >
> > >
> > > The idea was for the name to be generic as there could be Listener
> > > implementations
> > > just communicating with external monitoring/alerting systems and no
> > > metadata output
> > > -- but lets rethink that. For immutability, see below:
> > >
> > > 3) You're proposing to support a set of listeners. Since you're passing
> > the
> > > > mutable context around, which includes tags set by the previous
> > listener,
> > > > do you expect users to make any assumptions about the order in which
> > > > listeners are executed?
> > >
> > >
> > > In the existing proposal we are not making any assumptions about the
> > order
> > > of listeners,
> > > (system ones come first and then the ones loaded from the plugin
> manager)
> > > however listeners can use previous state (tags/labels) to make
> decisions:
> > > e.g., wont assign *UNKNOWN* failureType when we have already seen *USER
> > *or
> > > the other way around -- when we have seen *UNKNOWN* remove in favor of
> > > *USER*
> > >
> > >
> > > Cheers,
> > > Panagiotis
> > >
> > > On Sun, Mar 19, 2023 at 10:42 AM David Morávek <d...@apache.org>
> wrote:
> > >
> > > > Hi Panagiotis,
> > > >
> > > > This is an excellent proposal and something everyone trying to
> provide
> > > > "Flink as a service" needs to solve at some point. I have a couple of
> > > > questions:
> > > >
> > > > If I understand the proposal correctly, this is just about adding
> tags
> > to
> > > > the Throwable by running a tuple of (Throwable, FailureContext)
> > through a
> > > > user-defined model.
> > > >
> > > > 1) Should we also consider adding labels? The combination of tags and
> > > > labels seems to be what most systems offer; sometimes, they offer
> > labels
> > > > only (key=value pairs) because tags can be implemented using those,
> but
> > > not
> > > > the other way around.
> > > >
> > > > 2) Since we can not predict how heavy user-defined models
> ("listeners")
> > > are
> > > > going to be, it would be great to keep the interfaces/data structures
> > > > immutable so we can push things over to the I/O threads. Also, it
> > sounds
> > > > off to call the main interface a Listener since it's supposed to
> > enhance
> > > > the original throwable with additional metadata.
> > > >
> > > > I'd propose something along the lines of (we should have better
> names,
> > > this
> > > > is just to outline the idea):
> > > >
> > > > interface FailureEnricher {
> > > >
> > > >   ThrowableWithTagsAndLabels enrichFailure(Throwable cause,
> > > > ImmutableContextualMetadataAboutTheThrowable context);
> > > > }
> > > >
> > > > The names should change; this is just to outline the idea.
> > > >
> > > > 3) You're proposing to support a set of listeners. Since you're
> passing
> > > the
> > > > mutable context around, which includes tags set by the previous
> > listener,
> > > > do you expect users to make any assumptions about the order in which
> > > > listeners are executed?
> > > >
> > > > *@Shammon*
> > > >
> > > > Users may want to save results in listener, and then they can get the
> > > > > historical results even jabmanager failover. Can we provide a
> unified
> > > > > implementation for data storage requirements?
> > > >
> > > >
> > > > I think we should explicitly state that all "listeners" are treated
> as
> > > > stateless. I don't see any strong reason for snapshotting them.
> > > >
> > > > Best,
> > > > D.
> > > >
> > > > On Sat, Mar 18, 2023 at 1:00 AM Shammon FY <zjur...@gmail.com>
> wrote:
> > > >
> > > > > Hi Panagiotis
> > > > >
> > > > > Thank you for starting this discussion. I think this FLIP is
> valuable
> > > and
> > > > > can help user to analyze the causes of job failover better!
> > > > >
> > > > > I have two comments as follows
> > > > >
> > > > > 1. How about adding more job information in FailureListenerContext?
> > For
> > > > > example, job vertext, subtask, taskmanager location. And then user
> > can
> > > do
> > > > > more statistics according to different dimensions.
> > > > >
> > > > > 2. Users may want to save results in listener, and then they can
> get
> > > the
> > > > > historical results even jabmanager failover. Can we provide a
> unified
> > > > > implementation for data storage requirements?
> > > > >
> > > > >
> > > > > Best,
> > > > > shammon FY
> > > > >
> > > > >
> > > > > On Saturday, March 18, 2023, Panagiotis Garefalakis <
> > pga...@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > This FLIP [1] proposes a pluggable interface for failure handling
> > > > > allowing
> > > > > > users to implement custom failure logic using the plugin
> framework.
> > > > > > Motivated by existing proposals [2] and tickets [3], this enables
> > > > > use-cases
> > > > > > like: assigning particular types to failures (e.g., User or
> > System),
> > > > > > emitting custom metrics per type (e.g., application or platform),
> > > even
> > > > > > exposing errors to downstream consumers (e.g., notification
> > systems).
> > > > > >
> > > > > > Thanks to Piotr and Anton for the initial reviews and
> discussions!
> > > > > >
> > > > > > For anyone interested, the starting point would be the FLIP [1]
> > that
> > > I
> > > > > > created,
> > > > > > describing the motivation and the proposed changes (part of the
> > core,
> > > > > > runtime and web).
> > > > > >
> > > > > > The intuition behind this FLIP is being able to execute custom
> > logic
> > > on
> > > > > > failures by exposing a FailureListener interface. Implementation
> by
> > > > users
> > > > > > can be simply loaded to the system as Jar files. FailureListeners
> > may
> > > > > also
> > > > > > decide to assign failure tags to errors (expressed as strings),
> > > > > > that will then be exposed as metadata by the UI/Rest interfaces.
> > > > > >
> > > > > > Feedback is always appreciated! Looking forward to your thoughts!
> > > > > >
> > > > > > [1]
> > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%
> > > > > > 3A+Pluggable+failure+handling+for+Apache+Flink
> > > > > > [2]
> > > > > > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67
> > > > > > Hmjgy0-hRDeuFnrMgT4
> > > > > > [3] https://issues.apache.org/jira/browse/FLINK-20833
> > > > > >
> > > > > > Cheers,
> > > > > > Panagiotis
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to