This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7c8e3f5a0c3 [FLINK-34546] Emit span with failure labels on failure.
7c8e3f5a0c3 is described below
commit 7c8e3f5a0c39f9a82c5549925035344c5d27cb98
Author: Stefan Richter <[email protected]>
AuthorDate: Thu Feb 29 10:26:02 2024 +0100
[FLINK-34546] Emit span with failure labels on failure.
---
.../apache/flink/configuration/TraceOptions.java | 14 +++++
.../failover/ExecutionFailureHandler.java | 65 +++++++++++++++++++++-
.../flink/runtime/scheduler/DefaultScheduler.java | 4 +-
.../failover/ExecutionFailureHandlerTest.java | 39 ++++++++++++-
4 files changed, 117 insertions(+), 5 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
index 1aee746e210..a7e84192dea 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
@@ -56,6 +56,20 @@ public class TraceOptions {
+ " any of the names in the list will be
started. Otherwise, all reporters that could be found in"
+ " the configuration will be started.");
+ /**
+ * Temporary option to report events as span. This option will be removed
once we support
+ * reporting events.
+ */
+ @Deprecated
+ public static final ConfigOption<Boolean> REPORT_EVENTS_AS_SPANS =
+ key("traces.report-events-as-spans")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to report events as spans. This is a
temporary parameter that "
+ + "is in place until we have support for
reporting events. "
+ + "In the meantime, this can be activated
to report them as spans instead.");
+
/**
* Returns a view over the given configuration via which options can be
set/retrieved for the
* given reporter.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
index aed330de522..3d36a9e6bff 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
@@ -17,8 +17,11 @@
package org.apache.flink.runtime.executiongraph.failover;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.FailureEnricher.Context;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -28,6 +31,8 @@ import
org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.throwable.ThrowableClassifier;
import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.IterableUtils;
import javax.annotation.Nullable;
@@ -47,6 +52,8 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*/
public class ExecutionFailureHandler {
+ public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX =
"failureLabel.";
+
private final SchedulingTopology schedulingTopology;
/** Strategy to judge which tasks should be restarted. */
@@ -62,6 +69,9 @@ public class ExecutionFailureHandler {
private final Context globalFailureCtx;
private final Collection<FailureEnricher> failureEnrichers;
private final ComponentMainThreadExecutor mainThreadExecutor;
+ private final MetricGroup metricGroup;
+
+ private final boolean reportEventsAsSpans;
/**
* Creates the handler to deal with task failures.
@@ -76,13 +86,15 @@ public class ExecutionFailureHandler {
* @param globalFailureCtx Global failure Context used by FailureEnrichers
*/
public ExecutionFailureHandler(
+ final Configuration jobMasterConfig,
final SchedulingTopology schedulingTopology,
final FailoverStrategy failoverStrategy,
final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
final ComponentMainThreadExecutor mainThreadExecutor,
final Collection<FailureEnricher> failureEnrichers,
final Context taskFailureCtx,
- final Context globalFailureCtx) {
+ final Context globalFailureCtx,
+ final MetricGroup metricGroup) {
this.schedulingTopology = checkNotNull(schedulingTopology);
this.failoverStrategy = checkNotNull(failoverStrategy);
@@ -91,6 +103,8 @@ public class ExecutionFailureHandler {
this.failureEnrichers = checkNotNull(failureEnrichers);
this.taskFailureCtx = taskFailureCtx;
this.globalFailureCtx = globalFailureCtx;
+ this.metricGroup = metricGroup;
+ this.reportEventsAsSpans =
jobMasterConfig.get(TraceOptions.REPORT_EVENTS_AS_SPANS);
}
/**
@@ -104,7 +118,7 @@ public class ExecutionFailureHandler {
*/
public FailureHandlingResult getFailureHandlingResult(
Execution failedExecution, Throwable cause, long timestamp) {
- return handleFailure(
+ return handleFailureAndReport(
failedExecution,
cause,
timestamp,
@@ -123,7 +137,7 @@ public class ExecutionFailureHandler {
*/
public FailureHandlingResult getGlobalFailureHandlingResult(
final Throwable cause, long timestamp) {
- return handleFailure(
+ return handleFailureAndReport(
null,
cause,
timestamp,
@@ -141,6 +155,51 @@ public class ExecutionFailureHandler {
return FailureEnricherUtils.labelFailure(cause, ctx,
mainThreadExecutor, failureEnrichers);
}
+ private FailureHandlingResult handleFailureAndReport(
+ @Nullable final Execution failedExecution,
+ final Throwable cause,
+ long timestamp,
+ final Set<ExecutionVertexID> verticesToRestart,
+ final boolean globalFailure) {
+
+ FailureHandlingResult failureHandlingResult =
+ handleFailure(failedExecution, cause, timestamp,
verticesToRestart, globalFailure);
+
+ if (reportEventsAsSpans) {
+ // TODO: replace with reporting as event once events are supported.
+ // Add reporting as callback for when the failure labeling is
completed.
+ failureHandlingResult
+ .getFailureLabels()
+ .thenAcceptAsync(
+ labels ->
reportFailureHandling(failureHandlingResult, labels),
+ mainThreadExecutor);
+ }
+
+ return failureHandlingResult;
+ }
+
+ private void reportFailureHandling(
+ FailureHandlingResult failureHandlingResult, Map<String, String>
failureLabels) {
+
+ // Add base attributes
+ SpanBuilder spanBuilder =
+ Span.builder(ExecutionFailureHandler.class, "JobFailure")
+ .setStartTsMillis(failureHandlingResult.getTimestamp())
+ .setEndTsMillis(failureHandlingResult.getTimestamp())
+ .setAttribute(
+ "canRestart",
String.valueOf(failureHandlingResult.canRestart()))
+ .setAttribute(
+ "isGlobalFailure",
+
String.valueOf(failureHandlingResult.isGlobalFailure()));
+
+ // Add all failure labels
+ for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
+ spanBuilder.setAttribute(
+ FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(),
entry.getValue());
+ }
+ metricGroup.addSpan(spanBuilder);
+ }
+
private FailureHandlingResult handleFailure(
@Nullable final Execution failedExecution,
final Throwable cause,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index e3b046301b4..2fa43818ce2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -181,13 +181,15 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
this.executionFailureHandler =
new ExecutionFailureHandler(
+ jobMasterConfiguration,
getSchedulingTopology(),
failoverStrategy,
restartBackoffTimeStrategy,
mainThreadExecutor,
failureEnrichers,
taskFailureCtx,
- globalFailureCtx);
+ globalFailureCtx,
+ jobManagerJobMetricGroup);
this.schedulingStrategy =
schedulingStrategyFactory.createInstance(this,
getSchedulingTopology());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
index a9d3beada8a..3726742ae46 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
@@ -18,7 +18,10 @@
package org.apache.flink.runtime.executiongraph.failover;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.failure.TestingFailureEnricher;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -29,6 +32,8 @@ import
org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.IterableUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -36,7 +41,10 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,6 +74,8 @@ class ExecutionFailureHandlerTest {
private TestingFailureEnricher testingFailureEnricher;
+ private List<Span> spanCollector;
+
@BeforeEach
void setUp() {
TestingSchedulingTopology topology = new TestingSchedulingTopology();
@@ -77,15 +87,25 @@ class ExecutionFailureHandlerTest {
isNewAttempt = new AtomicBoolean(true);
backoffTimeStrategy =
new TestRestartBackoffTimeStrategy(true, RESTART_DELAY_MS,
isNewAttempt::get);
+ spanCollector = new CopyOnWriteArrayList<>();
+ Configuration configuration = new Configuration();
+ configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
executionFailureHandler =
new ExecutionFailureHandler(
+ configuration,
schedulingTopology,
failoverStrategy,
backoffTimeStrategy,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
Collections.singleton(testingFailureEnricher),
null,
- null);
+ null,
+ new UnregisteredMetricsGroup() {
+ @Override
+ public void addSpan(SpanBuilder spanBuilder) {
+ spanCollector.add(spanBuilder.build());
+ }
+ });
}
/** Tests the case that task restarting is accepted. */
@@ -115,6 +135,7 @@ class ExecutionFailureHandlerTest {
assertThat(result.getFailureLabels().get())
.isEqualTo(testingFailureEnricher.getFailureLabels());
assertThat(executionFailureHandler.getNumberOfRestarts()).isOne();
+ checkMetrics(spanCollector, false, true);
}
/** Tests the case that task restarting is suppressed. */
@@ -151,6 +172,7 @@ class ExecutionFailureHandlerTest {
.isInstanceOf(IllegalStateException.class);
assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
+ checkMetrics(spanCollector, false, false);
}
/** Tests the case that the failure is non-recoverable type. */
@@ -192,6 +214,7 @@ class ExecutionFailureHandlerTest {
.isInstanceOf(IllegalStateException.class);
assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
+ checkMetrics(spanCollector, false, false);
}
@Test
@@ -217,6 +240,7 @@ class ExecutionFailureHandlerTest {
isNewAttempt.set(false);
testHandlingConcurrentException(execution, error);
testHandlingConcurrentException(execution, error);
+ checkMetrics(spanCollector, false, true);
}
private void testHandlingRootException(Execution execution, Throwable
error) {
@@ -283,6 +307,7 @@ class ExecutionFailureHandlerTest {
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
assertThat(result.getFailureLabels().get())
.isEqualTo(testingFailureEnricher.getFailureLabels());
+ checkMetrics(spanCollector, true, true);
}
// ------------------------------------------------------------------------
@@ -310,4 +335,16 @@ class ExecutionFailureHandlerTest {
return tasksToRestart;
}
}
+
+ private void checkMetrics(List<Span> results, boolean global, boolean
canRestart) {
+ assertThat(results).isNotEmpty();
+ for (Span span : results) {
+
assertThat(span.getScope()).isEqualTo(ExecutionFailureHandler.class.getCanonicalName());
+ assertThat(span.getName()).isEqualTo("JobFailure");
+ Map<String, Object> attributes = span.getAttributes();
+ assertThat(attributes).containsEntry("failureLabel.failKey",
"failValue");
+ assertThat(attributes).containsEntry("canRestart",
String.valueOf(canRestart));
+ assertThat(attributes).containsEntry("isGlobalFailure",
String.valueOf(global));
+ }
+ }
}