This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 10bff3dbad1 [FLINK-34546] Emit span with failure labels on failure in
AdaptiveScheduler. (#24498)
10bff3dbad1 is described below
commit 10bff3dbad103b60915be817a3408820ed09b6cf
Author: Stefan Richter <[email protected]>
AuthorDate: Fri Mar 15 09:36:43 2024 +0100
[FLINK-34546] Emit span with failure labels on failure in
AdaptiveScheduler. (#24498)
---
.../failover/ExecutionFailureHandler.java | 32 ++-------
.../scheduler/adaptive/AdaptiveScheduler.java | 22 +++++-
.../runtime/scheduler/adaptive/Canceling.java | 4 +-
.../runtime/scheduler/adaptive/Executing.java | 10 ++-
.../flink/runtime/scheduler/adaptive/Failing.java | 4 +-
.../adaptive/JobFailureMetricReporter.java | 84 ++++++++++++++++++++++
.../runtime/scheduler/adaptive/Restarting.java | 4 +-
.../adaptive/StateWithExecutionGraph.java | 7 +-
.../scheduler/adaptive/StopWithSavepoint.java | 9 ++-
.../failover/ExecutionFailureHandlerTest.java | 4 +-
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 78 +++++++++++++++++++-
.../runtime/scheduler/adaptive/ExecutingTest.java | 3 +-
.../adaptive/StateWithExecutionGraphTest.java | 2 +-
.../scheduler/adaptive/StopWithSavepointTest.java | 13 +++-
14 files changed, 228 insertions(+), 48 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
index 3d36a9e6bff..94130bc2f5f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
@@ -26,13 +26,12 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
+import org.apache.flink.runtime.scheduler.adaptive.JobFailureMetricReporter;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.throwable.ThrowableClassifier;
import org.apache.flink.runtime.throwable.ThrowableType;
-import org.apache.flink.traces.Span;
-import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.IterableUtils;
import javax.annotation.Nullable;
@@ -70,8 +69,8 @@ public class ExecutionFailureHandler {
private final Collection<FailureEnricher> failureEnrichers;
private final ComponentMainThreadExecutor mainThreadExecutor;
private final MetricGroup metricGroup;
-
private final boolean reportEventsAsSpans;
+ private final JobFailureMetricReporter jobFailureMetricReporter;
/**
* Creates the handler to deal with task failures.
@@ -105,6 +104,7 @@ public class ExecutionFailureHandler {
this.globalFailureCtx = globalFailureCtx;
this.metricGroup = metricGroup;
this.reportEventsAsSpans =
jobMasterConfig.get(TraceOptions.REPORT_EVENTS_AS_SPANS);
+ this.jobFailureMetricReporter = new
JobFailureMetricReporter(metricGroup);
}
/**
@@ -171,35 +171,15 @@ public class ExecutionFailureHandler {
failureHandlingResult
.getFailureLabels()
.thenAcceptAsync(
- labels ->
reportFailureHandling(failureHandlingResult, labels),
+ labels ->
+ jobFailureMetricReporter.reportJobFailure(
+ failureHandlingResult, labels),
mainThreadExecutor);
}
return failureHandlingResult;
}
- private void reportFailureHandling(
- FailureHandlingResult failureHandlingResult, Map<String, String>
failureLabels) {
-
- // Add base attributes
- SpanBuilder spanBuilder =
- Span.builder(ExecutionFailureHandler.class, "JobFailure")
- .setStartTsMillis(failureHandlingResult.getTimestamp())
- .setEndTsMillis(failureHandlingResult.getTimestamp())
- .setAttribute(
- "canRestart",
String.valueOf(failureHandlingResult.canRestart()))
- .setAttribute(
- "isGlobalFailure",
-
String.valueOf(failureHandlingResult.isGlobalFailure()));
-
- // Add all failure labels
- for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
- spanBuilder.setAttribute(
- FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(),
entry.getValue());
- }
- metricGroup.addSpan(spanBuilder);
- }
-
private FailureHandlingResult handleFailure(
@Nullable final Execution failedExecution,
final Throwable cause,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 9fb9e07de0f..56ff9bed45b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
@@ -332,6 +333,9 @@ public class AdaptiveScheduler
private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+ private final JobFailureMetricReporter jobFailureMetricReporter;
+ private final boolean reportEventsAsSpans;
+
public AdaptiveScheduler(
Settings settings,
JobGraph jobGraph,
@@ -422,6 +426,9 @@ public class AdaptiveScheduler
this.exceptionHistory =
new
BoundedFIFOQueue<>(configuration.get(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+
+ this.jobFailureMetricReporter = new
JobFailureMetricReporter(jobManagerJobMetricGroup);
+ this.reportEventsAsSpans =
configuration.get(TraceOptions.REPORT_EVENTS_AS_SPANS);
}
private static void assertPreconditions(JobGraph jobGraph) throws
RuntimeException {
@@ -1316,7 +1323,20 @@ public class AdaptiveScheduler
}
@Override
- public FailureResult howToHandleFailure(Throwable failure) {
+ public FailureResult howToHandleFailure(
+ Throwable failure, CompletableFuture<Map<String, String>>
failureLabels) {
+ FailureResult failureResult = howToHandleFailure(failure);
+ if (reportEventsAsSpans) {
+ // TODO: replace with reporting as event once events are supported.
+ // Add reporting as callback for when the failure labeling is
completed.
+ failureLabels.thenAcceptAsync(
+ (labels) ->
jobFailureMetricReporter.reportJobFailure(failureResult, labels),
+ componentMainThreadExecutor);
+ }
+ return failureResult;
+ }
+
+ private FailureResult howToHandleFailure(Throwable failure) {
if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
return FailureResult.canNotRestart(
new JobException("The failure is not recoverable",
failure));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
index de8824b5335..95adb0cf5d9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
@@ -28,6 +28,8 @@ import
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry
import org.slf4j.Logger;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/** State which describes a job which is currently being canceled. */
class Canceling extends StateWithExecutionGraph {
@@ -66,7 +68,7 @@ class Canceling extends StateWithExecutionGraph {
}
@Override
- void onFailure(Throwable failure) {
+ void onFailure(Throwable failure, CompletableFuture<Map<String, String>>
failureLabels) {
// Execution graph is already cancelling, so there is nothing more we
can do.
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 42f38695362..96d835cdaa2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
@@ -106,8 +107,9 @@ class Executing extends StateWithExecutionGraph implements
ResourceListener {
}
@Override
- void onFailure(Throwable cause) {
- FailureResultUtil.restartOrFail(context.howToHandleFailure(cause),
context, this);
+ void onFailure(Throwable cause, CompletableFuture<Map<String, String>>
failureLabels) {
+ FailureResultUtil.restartOrFail(
+ context.howToHandleFailure(cause, failureLabels), context,
this);
}
@Override
@@ -255,9 +257,11 @@ class Executing extends StateWithExecutionGraph implements
ResourceListener {
* Asks how to handle the failure.
*
* @param failure failure describing the failure cause
+ * @param failureLabels future of labels from error classification.
* @return {@link FailureResult} which describes how to handle the
failure
*/
- FailureResult howToHandleFailure(Throwable failure);
+ FailureResult howToHandleFailure(
+ Throwable failure, CompletableFuture<Map<String, String>>
failureLabels);
/**
* Asks if we should rescale the currently executing job.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
index ccd6bae9367..4744fb8feb7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/** State which describes a failing job which is currently being canceled. */
class Failing extends StateWithExecutionGraph {
@@ -71,7 +73,7 @@ class Failing extends StateWithExecutionGraph {
}
@Override
- void onFailure(Throwable failure) {
+ void onFailure(Throwable failure, CompletableFuture<Map<String, String>>
failureLabels) {
// We've already failed the execution graph, so there is noting else
we can do.
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java
new file mode 100644
index 00000000000..fdbd497ecab
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+/** Helper class to simplify job failure reporting through a metric group. */
+public class JobFailureMetricReporter {
+
+ public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX =
"failureLabel.";
+
+ private final MetricGroup metricGroup;
+
+ public JobFailureMetricReporter(MetricGroup metricGroup) {
+ this.metricGroup = Preconditions.checkNotNull(metricGroup);
+ }
+
+ public void reportJobFailure(
+ FailureHandlingResult failureHandlingResult, Map<String, String>
failureLabels) {
+ reportJobFailure(
+ failureHandlingResult.getTimestamp(),
+ failureHandlingResult.canRestart(),
+ failureHandlingResult.isGlobalFailure(),
+ failureLabels);
+ }
+
+ public void reportJobFailure(
+ FailureResult failureHandlingResult, Map<String, String>
failureLabels) {
+ reportJobFailure(
+ System.currentTimeMillis(),
+ failureHandlingResult.canRestart(),
+ null,
+ failureLabels);
+ }
+
+ private void reportJobFailure(
+ long timestamp,
+ Boolean canRestart,
+ Boolean isGlobal,
+ Map<String, String> failureLabels) {
+ // Add base attributes
+ SpanBuilder spanBuilder =
+ Span.builder(JobFailureMetricReporter.class, "JobFailure")
+ .setStartTsMillis(timestamp)
+ .setEndTsMillis(timestamp);
+
+ if (canRestart != null) {
+ spanBuilder.setAttribute("canRestart", String.valueOf(canRestart));
+ }
+
+ if (isGlobal != null) {
+ spanBuilder.setAttribute("isGlobalFailure",
String.valueOf(isGlobal));
+ }
+
+ // Add all failure labels
+ for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
+ spanBuilder.setAttribute(
+ FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(),
entry.getValue());
+ }
+ metricGroup.addSpan(spanBuilder);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index 86e28fe8741..f647967edb4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -31,6 +31,8 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
/** State which describes a job which is currently being restarted. */
@@ -94,7 +96,7 @@ class Restarting extends StateWithExecutionGraph {
}
@Override
- void onFailure(Throwable failure) {
+ void onFailure(Throwable failure, CompletableFuture<Map<String, String>>
failureLabels) {
// We've already cancelled the execution graph, so there is noting
else we can do.
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 4a831669a36..6208de5000a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -377,7 +377,7 @@ abstract class StateWithExecutionGraph implements State {
}
/** Transition to different state when failure occurs. Stays in the same
state by default. */
- abstract void onFailure(Throwable cause);
+ abstract void onFailure(Throwable cause, CompletableFuture<Map<String,
String>> failureLabels);
/**
* Transition to different state when the execution graph reaches a
globally terminal state.
@@ -390,7 +390,7 @@ abstract class StateWithExecutionGraph implements State {
public void handleGlobalFailure(
Throwable cause, CompletableFuture<Map<String, String>>
failureLabels) {
failureCollection.add(ExceptionHistoryEntry.createGlobal(cause,
failureLabels));
- onFailure(cause);
+ onFailure(cause, failureLabels);
}
/**
@@ -422,7 +422,8 @@ abstract class StateWithExecutionGraph implements State {
ExceptionHistoryEntry.create(execution, taskName,
failureLabels));
onFailure(
ErrorInfo.handleMissingThrowable(
-
taskExecutionStateTransition.getError(userCodeClassLoader)));
+
taskExecutionStateTransition.getError(userCodeClassLoader)),
+ failureLabels);
}
}
return successfulUpdate;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
index b81b485960a..35d3f76787f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
@@ -209,7 +210,7 @@ class StopWithSavepoint extends StateWithExecutionGraph {
}
@Override
- void onFailure(Throwable cause) {
+ void onFailure(Throwable cause, CompletableFuture<Map<String, String>>
failureLabels) {
if (hasPendingStateTransition) {
// the error handling remains the same independent of how many
tasks have failed
// we don't want to initiate the same state transition multiple
times, so we exit early
@@ -231,7 +232,7 @@ class StopWithSavepoint extends StateWithExecutionGraph {
savepoint, getJobId(),
cause);
operationFailureCause = ex;
FailureResultUtil.restartOrFail(
- context.howToHandleFailure(ex), context,
this);
+ context.howToHandleFailure(ex,
failureLabels), context, this);
return null;
}));
}
@@ -279,9 +280,11 @@ class StopWithSavepoint extends StateWithExecutionGraph {
* Asks how to handle the failure.
*
* @param failure failure describing the failure cause
+ * @param failureLabels future of labels from error classification.
* @return {@link FailureResult} which describes how to handle the
failure
*/
- FailureResult howToHandleFailure(Throwable failure);
+ FailureResult howToHandleFailure(
+ Throwable failure, CompletableFuture<Map<String, String>>
failureLabels);
/**
* Runs the given action after the specified delay if the state is the
expected state at
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
index 3726742ae46..7602944b5f7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.adaptive.JobFailureMetricReporter;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
@@ -339,7 +340,8 @@ class ExecutionFailureHandlerTest {
private void checkMetrics(List<Span> results, boolean global, boolean
canRestart) {
assertThat(results).isNotEmpty();
for (Span span : results) {
-
assertThat(span.getScope()).isEqualTo(ExecutionFailureHandler.class.getCanonicalName());
+ assertThat(span.getScope())
+
.isEqualTo(JobFailureMetricReporter.class.getCanonicalName());
assertThat(span.getName()).isEqualTo("JobFailure");
Map<String, Object> attributes = span.getAttributes();
assertThat(attributes).containsEntry("failureLabel.failKey",
"failValue");
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index e2260d1c19c..273495b3975 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.TestingFailureEnricher;
@@ -83,6 +84,7 @@ import
org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
@@ -106,6 +108,8 @@ import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
@@ -1372,19 +1376,41 @@ public class AdaptiveSchedulerTest {
@Test
void testHowToHandleFailureRejectedByStrategy() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
+ final List<Span> spanCollector = new ArrayList<>(1);
+ final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup
testMetricGroup =
+ createTestMetricGroup(spanCollector);
+
final AdaptiveScheduler scheduler =
new AdaptiveSchedulerBuilder(
createJobGraph(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
.setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
+ .setJobMasterConfiguration(configuration)
+ .setJobManagerJobMetricGroup(testMetricGroup)
.build();
- assertThat(scheduler.howToHandleFailure(new
Exception("test")).canRestart()).isFalse();
+ assertThat(
+ scheduler
+ .howToHandleFailure(
+ new Exception("test"),
createFailureLabelsFuture())
+ .canRestart())
+ .isFalse();
+
+ assertThat(spanCollector).isEmpty();
+ mainThreadExecutor.trigger();
+ checkMetrics(spanCollector, false);
}
@Test
void testHowToHandleFailureAllowedByStrategy() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
+ final List<Span> spanCollector = new ArrayList<>(1);
+ final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup
testMetricGroup =
+ createTestMetricGroup(spanCollector);
final TestRestartBackoffTimeStrategy restartBackoffTimeStrategy =
new TestRestartBackoffTimeStrategy(true, 1234);
@@ -1394,30 +1420,50 @@ public class AdaptiveSchedulerTest {
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
.setRestartBackoffTimeStrategy(restartBackoffTimeStrategy)
+ .setJobMasterConfiguration(configuration)
+ .setJobManagerJobMetricGroup(testMetricGroup)
.build();
- final FailureResult failureResult = scheduler.howToHandleFailure(new
Exception("test"));
+ final FailureResult failureResult =
+ scheduler.howToHandleFailure(new Exception("test"),
createFailureLabelsFuture());
assertThat(failureResult.canRestart()).isTrue();
assertThat(failureResult.getBackoffTime().toMillis())
.isEqualTo(restartBackoffTimeStrategy.getBackoffTime());
+
+ assertThat(spanCollector).isEmpty();
+ mainThreadExecutor.trigger();
+ checkMetrics(spanCollector, true);
}
@Test
void testHowToHandleFailureUnrecoverableFailure() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
+ final List<Span> spanCollector = new ArrayList<>(1);
+ final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup
testMetricGroup =
+ createTestMetricGroup(spanCollector);
+
final AdaptiveScheduler scheduler =
new AdaptiveSchedulerBuilder(
createJobGraph(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
+ .setJobMasterConfiguration(configuration)
+ .setJobManagerJobMetricGroup(testMetricGroup)
.build();
assertThat(
scheduler
.howToHandleFailure(
- new SuppressRestartsException(new
Exception("test")))
+ new SuppressRestartsException(new
Exception("test")),
+ createFailureLabelsFuture())
.canRestart())
.isFalse();
+
+ assertThat(spanCollector).isEmpty();
+ mainThreadExecutor.trigger();
+ checkMetrics(spanCollector, false);
}
@Test
@@ -2495,4 +2541,30 @@ public class AdaptiveSchedulerTest {
return scheduler.requestJob().getExceptionHistory();
}
}
+
+ private static CompletableFuture<Map<String, String>>
createFailureLabelsFuture() {
+ return
CompletableFuture.completedFuture(Collections.singletonMap("failKey",
"failValue"));
+ }
+
+ private static
UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup
+ createTestMetricGroup(List<Span> output) {
+ return new
UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() {
+ @Override
+ public void addSpan(SpanBuilder spanBuilder) {
+ output.add(spanBuilder.build());
+ }
+ };
+ }
+
+ private static void checkMetrics(List<Span> results, boolean canRestart) {
+ assertThat(results).isNotEmpty();
+ for (Span span : results) {
+ assertThat(span.getScope())
+
.isEqualTo(JobFailureMetricReporter.class.getCanonicalName());
+ assertThat(span.getName()).isEqualTo("JobFailure");
+ Map<String, Object> attributes = span.getAttributes();
+ assertThat(attributes).containsEntry("failureLabel.failKey",
"failValue");
+ assertThat(attributes).containsEntry("canRestart",
String.valueOf(canRestart));
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index b892569ef1a..a9ee04f12ed 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -703,7 +703,8 @@ public class ExecutingTest extends TestLogger {
}
@Override
- public FailureResult howToHandleFailure(Throwable failure) {
+ public FailureResult howToHandleFailure(
+ Throwable failure, CompletableFuture<Map<String, String>>
failureLabels) {
return howToHandleFailure.apply(failure);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java
index cd1fd57036e..0a3d598ca36 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java
@@ -218,7 +218,7 @@ public class StateWithExecutionGraphTest extends TestLogger
{
}
@Override
- void onFailure(Throwable cause) {}
+ void onFailure(Throwable cause, CompletableFuture<Map<String, String>>
failureLabels) {}
@Override
void onGloballyTerminalState(JobStatus globallyTerminalState) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index f20cdd7e656..3bec0538fd5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -41,7 +41,9 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -381,7 +383,9 @@ class StopWithSavepointTest {
ctx.setHowToHandleFailure(failure ->
FailureResult.canRestart(failure, Duration.ZERO));
- sws.onFailure(new Exception("task failure"));
+ sws.onFailure(
+ new Exception("task failure"),
+ CompletableFuture.completedFuture(Collections.emptyMap()));
// this is a sanity check that we haven't scheduled a state
transition
ctx.triggerExecutors();
@@ -404,7 +408,9 @@ class StopWithSavepointTest {
ctx.setHowToHandleFailure(failure ->
FailureResult.canRestart(failure, Duration.ZERO));
- sws.onFailure(new Exception("task failure"));
+ sws.onFailure(
+ new Exception("task failure"),
+ CompletableFuture.completedFuture(Collections.emptyMap()));
// this is a sanity check that we haven't scheduled a state
transition
ctx.triggerExecutors();
@@ -538,7 +544,8 @@ class StopWithSavepointTest {
}
@Override
- public FailureResult howToHandleFailure(Throwable failure) {
+ public FailureResult howToHandleFailure(
+ Throwable failure, CompletableFuture<Map<String, String>>
failureLabels) {
return howToHandleFailure.apply(failure);
}