This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 206609e822a [FLINK-33086] Protect failure enrichment against unhandled
exceptions
206609e822a is described below
commit 206609e822a8029a78245e6eef7ab5d88a0f370b
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Wed Sep 13 19:07:25 2023 -0700
[FLINK-33086] Protect failure enrichment against unhandled exceptions
Co-authored-by: Chesnay Schepler <[email protected]>
---
.../runtime/failure/FailureEnricherUtils.java | 8 ++++
.../runtime/failure/FailureEnricherUtilsTest.java | 46 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
index f704a1ddbd7..d9b4c2278df 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
@@ -200,6 +200,14 @@ public class FailureEnricherUtils {
}
});
return validLabels;
+ })
+ .exceptionally(
+ t -> {
+ LOG.warn(
+ "Enricher {} threw an
exception.",
+ enricher.getClass(),
+ t);
+ return Collections.emptyMap();
}));
}
// combine all CompletableFutures into a single CompletableFuture
containing a Map of labels
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
index 8eedf2d0be2..d5b6ef334d3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
@@ -202,6 +202,38 @@ class FailureEnricherUtilsTest {
assertThatFuture(result).eventuallySucceeds().satisfies(labels ->
labels.isEmpty());
}
+ @Test
+ public void testLabelFailureWithValidAndThrowingEnricher() {
+ // A failing enricher shouldn't affect remaining enrichers with valid
labels
+ final Throwable cause = new RuntimeException("test exception");
+ final FailureEnricher validEnricher = new TestEnricher("enricherKey");
+ final FailureEnricher throwingEnricher = new
ThrowingEnricher("throwingKey");
+
+ final Set<FailureEnricher> enrichers =
+ new HashSet<FailureEnricher>() {
+ {
+ add(validEnricher);
+ add(throwingEnricher);
+ }
+ };
+
+ final CompletableFuture<Map<String, String>> result =
+ FailureEnricherUtils.labelFailure(
+ cause,
+ null,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ enrichers);
+
+ assertThatFuture(result)
+ .eventuallySucceeds()
+ .satisfies(
+ labels -> {
+ assertThat(labels).hasSize(1);
+ assertThat(labels).containsKey("enricherKey");
+
assertThat(labels).containsValue("enricherKeyValue");
+ });
+ }
+
@Test
public void testLabelFailureMergeException() {
// Throwing exception labelFailure when merging duplicate keys
@@ -253,6 +285,20 @@ class FailureEnricherUtilsTest {
}
}
+ private static class ThrowingEnricher extends TestEnricher {
+ ThrowingEnricher(String... outputKeys) {
+ super(outputKeys);
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> processFailure(
+ Throwable cause, Context context) {
+ final CompletableFuture<Map<String, String>> future = new
CompletableFuture<>();
+ future.completeExceptionally(new RuntimeException("test failure"));
+ return future;
+ }
+ }
+
private static class AndAnotherTestEnricher extends TestEnricher {
AndAnotherTestEnricher(String... outputKeys) {
super(outputKeys);