This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e58a718d04e [FLINK-33086] Protect failure enrichment against unhandled 
exceptions
e58a718d04e is described below

commit e58a718d04e3ec6e2a43da8a868e5515916c0eea
Author: Panagiotis Garefalakis <pga...@apache.org>
AuthorDate: Wed Sep 13 19:07:25 2023 -0700

    [FLINK-33086] Protect failure enrichment against unhandled exceptions
    
    Co-authored-by: Chesnay Schepler <ches...@apache.org>
---
 .../runtime/failure/FailureEnricherUtils.java      |  8 ++++
 .../runtime/failure/FailureEnricherUtilsTest.java  | 46 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
index f704a1ddbd7..d9b4c2278df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
@@ -200,6 +200,14 @@ public class FailureEnricherUtils {
                                                     }
                                                 });
                                         return validLabels;
+                                    })
+                            .exceptionally(
+                                    t -> {
+                                        LOG.warn(
+                                                "Enricher {} threw an 
exception.",
+                                                enricher.getClass(),
+                                                t);
+                                        return Collections.emptyMap();
                                     }));
         }
         // combine all CompletableFutures into a single CompletableFuture 
containing a Map of labels
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
index 8eedf2d0be2..d5b6ef334d3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
@@ -202,6 +202,38 @@ class FailureEnricherUtilsTest {
         assertThatFuture(result).eventuallySucceeds().satisfies(labels -> 
labels.isEmpty());
     }
 
+    @Test
+    public void testLabelFailureWithValidAndThrowingEnricher() {
+        // A failing enricher shouldn't affect remaining enrichers with valid 
labels
+        final Throwable cause = new RuntimeException("test exception");
+        final FailureEnricher validEnricher = new TestEnricher("enricherKey");
+        final FailureEnricher throwingEnricher = new 
ThrowingEnricher("throwingKey");
+
+        final Set<FailureEnricher> enrichers =
+                new HashSet<FailureEnricher>() {
+                    {
+                        add(validEnricher);
+                        add(throwingEnricher);
+                    }
+                };
+
+        final CompletableFuture<Map<String, String>> result =
+                FailureEnricherUtils.labelFailure(
+                        cause,
+                        null,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        enrichers);
+
+        assertThatFuture(result)
+                .eventuallySucceeds()
+                .satisfies(
+                        labels -> {
+                            assertThat(labels).hasSize(1);
+                            assertThat(labels).containsKey("enricherKey");
+                            
assertThat(labels).containsValue("enricherKeyValue");
+                        });
+    }
+
     @Test
     public void testLabelFailureMergeException() {
         // Throwing exception labelFailure when merging duplicate keys
@@ -253,6 +285,20 @@ class FailureEnricherUtilsTest {
         }
     }
 
+    private static class ThrowingEnricher extends TestEnricher {
+        ThrowingEnricher(String... outputKeys) {
+            super(outputKeys);
+        }
+
+        @Override
+        public CompletableFuture<Map<String, String>> processFailure(
+                Throwable cause, Context context) {
+            final CompletableFuture<Map<String, String>> future = new 
CompletableFuture<>();
+            future.completeExceptionally(new RuntimeException("test failure"));
+            return future;
+        }
+    }
+
     private static class AndAnotherTestEnricher extends TestEnricher {
         AndAnotherTestEnricher(String... outputKeys) {
             super(outputKeys);

Reply via email to