Hi Panagiotis, Thanks for driving this.
+1 for supporting custom restart strategy, we did receive such requests from the user mailing list [1][2]. Besides, in current design, the plugin will only do some statistical and classification work, and will not affect the *FailureHandlingResult*. Just listening, no handling, it doesn't quite match the title. [1] https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1 [2] https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx Best, Lijie Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道: > Hi Panagiotis, > > Thanks for creating this proposal! It's good to enable Flink to handle > different errors in different ways, through a pluggable way. > > There are requests for flexible restart strategies from time to time, for > different strategies of restart backoff time, or to suppress restarting > on certain errors. Therefore, I think it's better that the proposed > failure handling plugin can also support custom restart strategies. > > Maybe we can call it FailureHandlingAdvisor which provides more > information (labels) and gives advice (restart backoff time, whether > to restart)? I do not have a strong opinion though, any explanatory > name would be good. > > To avoid unexpected mutation, how about to make the context immutable > and let the plugin return an immutable result? i.e. remove the setters > from the context, and let the plugin method return a result which > contains `labels`, `canRestart` and `restartBackoffTime`. Flink should > apply the result to the context before invoking the next plugin, so > that the next plugin will see the updated context. > > The plugin should avoid taking too much time to return the result, because > it will block the RPC and result in instability. However, it can still > perform heavy actions in a different thread. The context can provide an > `ioExecutor` to the plugins for reuse. > > Thanks, > Zhu > > Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 20:21写道: > > > > Hi Panagiotis > > > > Thank you for your answer. I agree that `FailureListener` could be > > stateless, then I have some thoughts as follows > > > > 1. I see that listeners and tag collections are associated. When > JobManager > > fails and restarts, how can the new listener be associated with the tag > > collection before failover? Is the listener loading order? > > > > 2. The tag collection may be too large, resulting in the JobManager OOM, > do > > we need to provide a management class that supports some obsolescence > > strategies instead of a direct Collection? > > > > 3. Is it possible to provide a more complex data structure than a simple > > string collection for tags in listeners, such as key-value? > > > > Best, > > Shammon FY > > > > > > On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu <xbjt...@gmail.com> wrote: > > > > > Hi,Panagiotis > > > > > > > > > Thank you for kicking off this discussion. Overall, the proposed > feature of > > > this FLIP makes sense to me. We have also discussed similar > requirements > > > with our users and developers, and I believe it will help many users. > > > > > > > > > In terms of FLIP content, I have some thoughts: > > > > > > (1) For the FailureListenerContextget interface, the methods > > > FailureListenerContext#addTag and FailureListenerContextgetTags looks > very > > > inconsistent because they imply specific implementation details, and > not > > > all FailureListeners need to handle them, we shouldn't put them in the > > > interface. Minor: The comment "UDF loading" in the getUserClassLoader() > > > method looks like a typo, IIUC it should return the classloader of the > > > current job. > > > > > > (2) Regarding the implementation in > ExecutionFailureHandler#handleFailure, > > > some custom listeners may have heavy IO operations, such as reporting > to > > > their monitoring system. The current logic appears to be processing in > the > > > JobMaster's main thread, and it is recommended not to do this kind of > > > processing in the main thread. > > > > > > (3) The results of FailureListener's processing and the > > > FailureHandlingResult returned by ExecutionFailureHandler are not > related. > > > I think these two are closely related, the motivation of this FLIP is > to > > > make current failure handling more flexible. From this perspective, > > > different listeners should have the opportunity to affect the job's > failure > > > handling flow. For example, a Flink job is configured with a > > > RestartStrategy with huge numbers retry , but the Kafka topic of > Source has > > > been deleted, the job will failover continuously. In this case, the > user > > > should have their listener to determine whether this failure is > recoverable > > > or unrecoverable, and then wrap the processing result into > > > FailureHandlingResult.unrecoverable(xx) and pass it to JobMaster, this > > > approach will be more flexible. > > > > > > (4) All FLIPs have an important section named Public Interfaces. > Current > > > FLIP mixes the interface section and the implementation section > together. > > > It is better for us to refer to the FLIP template[1] and separate them, > > > this will make the entire FLIP clearer. > > > > > > In addition, regarding the FLIP process, there is a small suggestion: > The > > > community generally creates a JIRA issue after the FLIP vote is passed, > > > instead of during the FLIP preparation phase because the FLIP may be > > > rejected. Although this FLIP is very reasonable, it's better to follow > the > > > process. > > > > > > Best, > > > > > > Leonard > > > > > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template > > > > > > On Mon, Mar 20, 2023 at 7:04 PM David Morávek <d...@apache.org> wrote: > > > > > > > > > > > > > however listeners can use previous state (tags/labels) to make > > > decisions > > > > > > > > > > > > That sounds like a very fragile contract. We should either allow > passing > > > > tags between listeners and then need to define ordering or make all > of > > > them > > > > independent. I prefer the latter because it allows us to parallelize > > > things > > > > if needed (if all listeners trigger an RCP to the external system, > for > > > > example). > > > > > > > > Can you expand on why we need more than one classifier to be able to > > > output > > > > the same tag? > > > > > > > > system ones come first and then the ones loaded from the plugin > manager > > > > > > > > > > > > Since they're returned as a Set, the order is completely > > > non-deterministic, > > > > no matter in which order they're loaded. > > > > > > > > just communicating with external monitoring/alerting systems > > > > > > > > > > > > > That makes the need for pushing things out of the main thread even > > > > stronger. This almost sounds like we need to return a > CompletableFuture > > > for > > > > the per-throwable classification because an external system might > take a > > > > significant time to respond. We need to unblock the main thread for > other > > > > RPCs. > > > > > > > > Also, in the proposal, this happens in the failure handler. If > that's the > > > > case, this might block the job from being restarted (if the restart > > > > strategy allows for another restart), which would be great to avoid > > > because > > > > it can introduce extra downtime. > > > > > > > > This raises another question: what should happen if the > classification > > > > fails? Crashing the job (which is what's currently proposed) seems > very > > > > dangerous if this might depend on an external system. > > > > > > > > Thats a valid point, passing the JobGraph containing all the above > > > > > information is also something to consider > > > > > > > > > > > > > We should avoid passing JG around because it's mutable (which we > must fix > > > > in the long term), and letting users change it might have > consequences. > > > > > > > > Best, > > > > D. > > > > > > > > On Mon, Mar 20, 2023 at 7:23 AM Panagiotis Garefalakis < > > > pga...@apache.org> > > > > wrote: > > > > > > > > > Hey David, Shammon, > > > > > > > > > > Thanks for the valuable comments! > > > > > I am glad you find this proposal useful, some thoughts: > > > > > > > > > > @Shammon > > > > > > > > > > 1. How about adding more job information in > FailureListenerContext? For > > > > > > example, job vertext, subtask, taskmanager location. And then > user > > > can > > > > do > > > > > > more statistics according to different dimensions. > > > > > > > > > > > > > > > Thats a valid point, passing the JobGraph containing all the above > > > > > information > > > > > is also something to consider, I was mostly trying to be > conservative: > > > > > i.e., passingly only the information we need, and extend as we see > fit > > > > > > > > > > 2. Users may want to save results in listener, and then they can > get > > > the > > > > > > historical results even jabmanager failover. Can we provide a > unified > > > > > > implementation for data storage requirements? > > > > > > > > > > > > > > > The idea is to store only the output of the Listeners (tags) and > treat > > > > them > > > > > as stateless. > > > > > Tags are be stored along with HistoryEntries, and will be available > > > > through > > > > > the HistoryServer > > > > > even after a JM dies. > > > > > > > > > > @David > > > > > > > > > > 1) Should we also consider adding labels? The combination of tags > and > > > > > > labels seems to be what most systems offer; sometimes, they offer > > > > labels > > > > > > only (key=value pairs) because tags can be implemented using > those, > > > but > > > > > not > > > > > > the other way around. > > > > > > > > > > > > > > > Indeed changing tags to k:v labels could be more expressive, I > like it! > > > > > Let's see what others think. > > > > > > > > > > 2) Since we can not predict how heavy user-defined models > ("listeners") > > > > are > > > > > > going to be, it would be great to keep the interfaces/data > structures > > > > > > immutable so we can push things over to the I/O threads. Also, it > > > > sounds > > > > > > off to call the main interface a Listener since it's supposed to > > > > enhance > > > > > > the original throwable with additional metadata. > > > > > > > > > > > > > > > The idea was for the name to be generic as there could be Listener > > > > > implementations > > > > > just communicating with external monitoring/alerting systems and no > > > > > metadata output > > > > > -- but lets rethink that. For immutability, see below: > > > > > > > > > > 3) You're proposing to support a set of listeners. Since you're > passing > > > > the > > > > > > mutable context around, which includes tags set by the previous > > > > listener, > > > > > > do you expect users to make any assumptions about the order in > which > > > > > > listeners are executed? > > > > > > > > > > > > > > > In the existing proposal we are not making any assumptions about > the > > > > order > > > > > of listeners, > > > > > (system ones come first and then the ones loaded from the plugin > > > manager) > > > > > however listeners can use previous state (tags/labels) to make > > > decisions: > > > > > e.g., wont assign *UNKNOWN* failureType when we have already seen > *USER > > > > *or > > > > > the other way around -- when we have seen *UNKNOWN* remove in > favor of > > > > > *USER* > > > > > > > > > > > > > > > Cheers, > > > > > Panagiotis > > > > > > > > > > On Sun, Mar 19, 2023 at 10:42 AM David Morávek <d...@apache.org> > > > wrote: > > > > > > > > > > > Hi Panagiotis, > > > > > > > > > > > > This is an excellent proposal and something everyone trying to > > > provide > > > > > > "Flink as a service" needs to solve at some point. I have a > couple of > > > > > > questions: > > > > > > > > > > > > If I understand the proposal correctly, this is just about adding > > > tags > > > > to > > > > > > the Throwable by running a tuple of (Throwable, FailureContext) > > > > through a > > > > > > user-defined model. > > > > > > > > > > > > 1) Should we also consider adding labels? The combination of > tags and > > > > > > labels seems to be what most systems offer; sometimes, they offer > > > > labels > > > > > > only (key=value pairs) because tags can be implemented using > those, > > > but > > > > > not > > > > > > the other way around. > > > > > > > > > > > > 2) Since we can not predict how heavy user-defined models > > > ("listeners") > > > > > are > > > > > > going to be, it would be great to keep the interfaces/data > structures > > > > > > immutable so we can push things over to the I/O threads. Also, it > > > > sounds > > > > > > off to call the main interface a Listener since it's supposed to > > > > enhance > > > > > > the original throwable with additional metadata. > > > > > > > > > > > > I'd propose something along the lines of (we should have better > > > names, > > > > > this > > > > > > is just to outline the idea): > > > > > > > > > > > > interface FailureEnricher { > > > > > > > > > > > > ThrowableWithTagsAndLabels enrichFailure(Throwable cause, > > > > > > ImmutableContextualMetadataAboutTheThrowable context); > > > > > > } > > > > > > > > > > > > The names should change; this is just to outline the idea. > > > > > > > > > > > > 3) You're proposing to support a set of listeners. Since you're > > > passing > > > > > the > > > > > > mutable context around, which includes tags set by the previous > > > > listener, > > > > > > do you expect users to make any assumptions about the order in > which > > > > > > listeners are executed? > > > > > > > > > > > > *@Shammon* > > > > > > > > > > > > Users may want to save results in listener, and then they can > get the > > > > > > > historical results even jabmanager failover. Can we provide a > > > unified > > > > > > > implementation for data storage requirements? > > > > > > > > > > > > > > > > > > I think we should explicitly state that all "listeners" are > treated > > > as > > > > > > stateless. I don't see any strong reason for snapshotting them. > > > > > > > > > > > > Best, > > > > > > D. > > > > > > > > > > > > On Sat, Mar 18, 2023 at 1:00 AM Shammon FY <zjur...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hi Panagiotis > > > > > > > > > > > > > > Thank you for starting this discussion. I think this FLIP is > > > valuable > > > > > and > > > > > > > can help user to analyze the causes of job failover better! > > > > > > > > > > > > > > I have two comments as follows > > > > > > > > > > > > > > 1. How about adding more job information in > FailureListenerContext? > > > > For > > > > > > > example, job vertext, subtask, taskmanager location. And then > user > > > > can > > > > > do > > > > > > > more statistics according to different dimensions. > > > > > > > > > > > > > > 2. Users may want to save results in listener, and then they > can > > > get > > > > > the > > > > > > > historical results even jabmanager failover. Can we provide a > > > unified > > > > > > > implementation for data storage requirements? > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > shammon FY > > > > > > > > > > > > > > > > > > > > > On Saturday, March 18, 2023, Panagiotis Garefalakis < > > > > pga...@apache.org > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > This FLIP [1] proposes a pluggable interface for failure > handling > > > > > > > allowing > > > > > > > > users to implement custom failure logic using the plugin > > > framework. > > > > > > > > Motivated by existing proposals [2] and tickets [3], this > enables > > > > > > > use-cases > > > > > > > > like: assigning particular types to failures (e.g., User or > > > > System), > > > > > > > > emitting custom metrics per type (e.g., application or > platform), > > > > > even > > > > > > > > exposing errors to downstream consumers (e.g., notification > > > > systems). > > > > > > > > > > > > > > > > Thanks to Piotr and Anton for the initial reviews and > > > discussions! > > > > > > > > > > > > > > > > For anyone interested, the starting point would be the FLIP > [1] > > > > that > > > > > I > > > > > > > > created, > > > > > > > > describing the motivation and the proposed changes (part of > the > > > > core, > > > > > > > > runtime and web). > > > > > > > > > > > > > > > > The intuition behind this FLIP is being able to execute > custom > > > > logic > > > > > on > > > > > > > > failures by exposing a FailureListener interface. > Implementation > > > by > > > > > > users > > > > > > > > can be simply loaded to the system as Jar files. > FailureListeners > > > > may > > > > > > > also > > > > > > > > decide to assign failure tags to errors (expressed as > strings), > > > > > > > > that will then be exposed as metadata by the UI/Rest > interfaces. > > > > > > > > > > > > > > > > Feedback is always appreciated! Looking forward to your > thoughts! > > > > > > > > > > > > > > > > [1] > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-304% > > > > > > > > 3A+Pluggable+failure+handling+for+Apache+Flink > > > > > > > > [2] > > > > > > > > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67 > > > > > > > > Hmjgy0-hRDeuFnrMgT4 > > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-20833 > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Panagiotis > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >