This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new 206609e822a [FLINK-33086] Protect failure enrichment against unhandled exceptions 206609e822a is described below commit 206609e822a8029a78245e6eef7ab5d88a0f370b Author: Panagiotis Garefalakis <pga...@apache.org> AuthorDate: Wed Sep 13 19:07:25 2023 -0700 [FLINK-33086] Protect failure enrichment against unhandled exceptions Co-authored-by: Chesnay Schepler <ches...@apache.org> --- .../runtime/failure/FailureEnricherUtils.java | 8 ++++ .../runtime/failure/FailureEnricherUtilsTest.java | 46 ++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java index f704a1ddbd7..d9b4c2278df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java @@ -200,6 +200,14 @@ public class FailureEnricherUtils { } }); return validLabels; + }) + .exceptionally( + t -> { + LOG.warn( + "Enricher {} threw an exception.", + enricher.getClass(), + t); + return Collections.emptyMap(); })); } // combine all CompletableFutures into a single CompletableFuture containing a Map of labels diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java index 8eedf2d0be2..d5b6ef334d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java @@ -202,6 +202,38 @@ class FailureEnricherUtilsTest { assertThatFuture(result).eventuallySucceeds().satisfies(labels -> labels.isEmpty()); } + @Test + public void testLabelFailureWithValidAndThrowingEnricher() { + // A failing enricher shouldn't affect remaining enrichers with valid labels + final Throwable cause = new RuntimeException("test exception"); + final FailureEnricher validEnricher = new TestEnricher("enricherKey"); + final FailureEnricher throwingEnricher = new ThrowingEnricher("throwingKey"); + + final Set<FailureEnricher> enrichers = + new HashSet<FailureEnricher>() { + { + add(validEnricher); + add(throwingEnricher); + } + }; + + final CompletableFuture<Map<String, String>> result = + FailureEnricherUtils.labelFailure( + cause, + null, + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + enrichers); + + assertThatFuture(result) + .eventuallySucceeds() + .satisfies( + labels -> { + assertThat(labels).hasSize(1); + assertThat(labels).containsKey("enricherKey"); + assertThat(labels).containsValue("enricherKeyValue"); + }); + } + @Test public void testLabelFailureMergeException() { // Throwing exception labelFailure when merging duplicate keys @@ -253,6 +285,20 @@ class FailureEnricherUtilsTest { } } + private static class ThrowingEnricher extends TestEnricher { + ThrowingEnricher(String... outputKeys) { + super(outputKeys); + } + + @Override + public CompletableFuture<Map<String, String>> processFailure( + Throwable cause, Context context) { + final CompletableFuture<Map<String, String>> future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("test failure")); + return future; + } + } + private static class AndAnotherTestEnricher extends TestEnricher { AndAnotherTestEnricher(String... outputKeys) { super(outputKeys);