This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a9383fd4d51 [FLINK-31890][runtime] Introduce DefaultScheduler failure
enrichment/labeling
a9383fd4d51 is described below
commit a9383fd4d51b1161292628145e2f427f574a07d4
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Mon May 1 20:22:14 2023 -0700
[FLINK-31890][runtime] Introduce DefaultScheduler failure
enrichment/labeling
* Introduce async task failure labeling as part of
ExecutionFailureHandler#handleFailure (both for local and global failures)
* Introduce two fields to ExceptionHistoryEntry: a transient
CompletableFuture<Map<String, String>> failureLabelsFuture as well as a
Map<String, String> failureLabels -- the failureLabels are set as soon as
failureLabelsFuture is completed
* Extend ExceptionHistoryEntry, FailureHandlingResult,
FailureHandlingResultSnapshot to expose labels as part of ExceptionHistory
* Extend existing tests (e.g., DefaultSchedulerTest,
FailureHandlingResultTest) to validate functionality
---
.../flink/core/failure/TestingFailureEnricher.java | 51 ++++++++++
.../failover/flip1/ExecutionFailureHandler.java | 40 +++++++-
.../failover/flip1/FailureHandlingResult.java | 31 +++++-
.../runtime/failure/FailureEnricherUtils.java | 12 ++-
.../DefaultSlotPoolServiceSchedulerFactory.java | 4 +
.../apache/flink/runtime/jobmaster/JobMaster.java | 7 ++
.../jobmaster/SlotPoolServiceSchedulerFactory.java | 3 +
.../factories/DefaultJobMasterServiceFactory.java | 2 +
.../flink/runtime/scheduler/DefaultScheduler.java | 34 ++++++-
.../runtime/scheduler/DefaultSchedulerFactory.java | 4 +
.../flink/runtime/scheduler/SchedulerBase.java | 18 +++-
.../runtime/scheduler/SchedulerNGFactory.java | 3 +
.../adaptive/AdaptiveSchedulerFactory.java | 3 +
.../adaptive/StateWithExecutionGraph.java | 5 +-
.../adaptivebatch/AdaptiveBatchScheduler.java | 7 +-
.../AdaptiveBatchSchedulerFactory.java | 5 +
.../adaptivebatch/SpeculativeScheduler.java | 8 +-
.../exceptionhistory/ExceptionHistoryEntry.java | 49 +++++++++-
.../FailureHandlingResultSnapshot.java | 15 +++
.../RootExceptionHistoryEntry.java | 35 +++++--
.../flip1/ExecutionFailureHandlerTest.java | 28 +++++-
.../failover/flip1/FailureHandlingResultTest.java | 24 +++--
.../runtime/failure/FailureEnricherUtilsTest.java | 19 +++-
.../runtime/jobmaster/JobMasterSchedulerTest.java | 3 +
.../runtime/jobmaster/utils/JobMasterBuilder.java | 6 ++
.../rest/handler/job/JobExceptionsHandlerTest.java | 58 ++++++++---
.../runtime/scheduler/DefaultSchedulerBuilder.java | 13 +++
.../runtime/scheduler/DefaultSchedulerTest.java | 108 ++++++++++++++++++++-
.../scheduler/TestingSchedulerNGFactory.java | 3 +
.../ExceptionHistoryEntryMatcher.java | 37 ++++++-
.../ExceptionHistoryEntryTest.java | 22 ++++-
.../ExceptionHistoryEntryTestingUtils.java | 35 +++++++
.../FailureHandlingResultSnapshotTest.java | 23 ++++-
.../RootExceptionHistoryEntryTest.java | 18 +++-
34 files changed, 671 insertions(+), 62 deletions(-)
diff --git
a/flink-core/src/test/java/org/apache/flink/core/failure/TestingFailureEnricher.java
b/flink-core/src/test/java/org/apache/flink/core/failure/TestingFailureEnricher.java
new file mode 100644
index 00000000000..d6a940871a0
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/core/failure/TestingFailureEnricher.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.failure;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link FailureEnricher} for testing purposes tracking Throwables and
output failure labels. */
+public class TestingFailureEnricher implements FailureEnricher {
+
+ final Set<Throwable> seenThrowables = new HashSet<>();
+ final Map<String, String> failureLabels =
Collections.singletonMap("failKey", "failValue");
+
+ @Override
+ public Set<String> getOutputKeys() {
+ return Collections.singleton("failKey");
+ }
+
+ @Override
+ public CompletableFuture<Map<String, String>> processFailure(Throwable
cause, Context context) {
+ seenThrowables.add(cause);
+ return CompletableFuture.completedFuture(failureLabels);
+ }
+
+ public Set<Throwable> getSeenThrowables() {
+ return seenThrowables;
+ }
+
+ public Map<String, String> getFailureLabels() {
+ return failureLabels;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 127fe18ee6b..cee213c7488 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -17,8 +17,12 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricher.Context;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
@@ -28,8 +32,11 @@ import org.apache.flink.util.IterableUtils;
import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,6 +58,11 @@ public class ExecutionFailureHandler {
/** Number of all restarts happened since this job is submitted. */
private long numberOfRestarts;
+ private final Context taskFailureCtx;
+ private final Context globalFailureCtx;
+ private final Collection<FailureEnricher> failureEnrichers;
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
/**
* Creates the handler to deal with task failures.
*
@@ -58,15 +70,27 @@ public class ExecutionFailureHandler {
* @param failoverStrategy helps to decide tasks to restart on task
failures
* @param restartBackoffTimeStrategy helps to decide whether to restart
failed tasks and the
* restarting delay
+ * @param mainThreadExecutor the main thread executor of the job master
+ * @param failureEnrichers a collection of {@link FailureEnricher} that
enrich failures
+ * @param taskFailureCtx Task failure Context used by FailureEnrichers
+ * @param globalFailureCtx Global failure Context used by FailureEnrichers
*/
public ExecutionFailureHandler(
final SchedulingTopology schedulingTopology,
final FailoverStrategy failoverStrategy,
- final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final Collection<FailureEnricher> failureEnrichers,
+ final Context taskFailureCtx,
+ final Context globalFailureCtx) {
this.schedulingTopology = checkNotNull(schedulingTopology);
this.failoverStrategy = checkNotNull(failoverStrategy);
this.restartBackoffTimeStrategy =
checkNotNull(restartBackoffTimeStrategy);
+ this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+ this.failureEnrichers = checkNotNull(failureEnrichers);
+ this.taskFailureCtx = taskFailureCtx;
+ this.globalFailureCtx = globalFailureCtx;
}
/**
@@ -109,6 +133,14 @@ public class ExecutionFailureHandler {
true);
}
+ private CompletableFuture<Map<String, String>> labelFailure(Throwable
cause, boolean isGlobal) {
+ if (failureEnrichers.isEmpty()) {
+ return FailureEnricherUtils.EMPTY_FAILURE_LABELS;
+ }
+ final Context ctx = isGlobal ? globalFailureCtx : taskFailureCtx;
+ return FailureEnricherUtils.labelFailure(cause, ctx,
mainThreadExecutor, failureEnrichers);
+ }
+
private FailureHandlingResult handleFailure(
@Nullable final Execution failedExecution,
final Throwable cause,
@@ -116,11 +148,15 @@ public class ExecutionFailureHandler {
final Set<ExecutionVertexID> verticesToRestart,
final boolean globalFailure) {
+ final CompletableFuture<Map<String, String>> failureLabels =
+ labelFailure(cause, globalFailure);
+
if (isUnrecoverableError(cause)) {
return FailureHandlingResult.unrecoverable(
failedExecution,
new JobException("The failure is not recoverable", cause),
timestamp,
+ failureLabels,
globalFailure);
}
@@ -132,6 +168,7 @@ public class ExecutionFailureHandler {
failedExecution,
cause,
timestamp,
+ failureLabels,
verticesToRestart,
restartBackoffTimeStrategy.getBackoffTime(),
globalFailure);
@@ -141,6 +178,7 @@ public class ExecutionFailureHandler {
new JobException(
"Recovery is suppressed by " +
restartBackoffTimeStrategy, cause),
timestamp,
+ failureLabels,
globalFailure);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
index 828fbe243e7..ede19bbde15 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
@@ -24,8 +24,10 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -55,6 +57,9 @@ public class FailureHandlingResult {
/** Failure reason. {@code @Nullable} because of FLINK-21376. */
@Nullable private final Throwable error;
+ /** Future Map of string labels characterizing the failure. */
+ private final CompletableFuture<Map<String, String>> failureLabels;
+
/** Failure timestamp. */
private final long timestamp;
@@ -68,6 +73,7 @@ public class FailureHandlingResult {
* {@code null} as a value indicates that the failure was issued by
Flink itself.
* @param cause the exception that caused this failure.
* @param timestamp the time the failure was handled.
+ * @param failureLabels collection of string tags characterizing the
failure.
* @param verticesToRestart containing task vertices to restart to recover
from the failure.
* {@code null} indicates that the failure is not restartable.
* @param restartDelayMS indicate a delay before conducting the restart
@@ -76,6 +82,7 @@ public class FailureHandlingResult {
@Nullable Execution failedExecution,
@Nullable Throwable cause,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
@@ -85,6 +92,7 @@ public class FailureHandlingResult {
this.restartDelayMS = restartDelayMS;
this.failedExecution = failedExecution;
this.error = cause;
+ this.failureLabels = failureLabels;
this.timestamp = timestamp;
this.globalFailure = globalFailure;
}
@@ -96,16 +104,20 @@ public class FailureHandlingResult {
* {@code null} as a value indicates that the failure was issued by
Flink itself.
* @param error reason why the failure is not recoverable
* @param timestamp the time the failure was handled.
+ * @param failureLabels collection of tags characterizing the failure as
produced by the
+ * FailureEnrichers
*/
private FailureHandlingResult(
@Nullable Execution failedExecution,
@Nonnull Throwable error,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
boolean globalFailure) {
this.verticesToRestart = null;
this.restartDelayMS = -1;
this.failedExecution = failedExecution;
this.error = checkNotNull(error);
+ this.failureLabels = failureLabels;
this.timestamp = timestamp;
this.globalFailure = globalFailure;
}
@@ -159,6 +171,15 @@ public class FailureHandlingResult {
return error;
}
+ /**
+ * Returns the labels future associated with the failure.
+ *
+ * @return the CompletableFuture Map of String labels
+ */
+ public CompletableFuture<Map<String, String>> getFailureLabels() {
+ return failureLabels;
+ }
+
/**
* Returns the time of the failure.
*
@@ -195,6 +216,8 @@ public class FailureHandlingResult {
* {@code null} as a value indicates that the failure was issued by
Flink itself.
* @param cause The reason of the failure.
* @param timestamp The time of the failure.
+ * @param failureLabels Map of labels characterizing the failure produced
by the
+ * FailureEnrichers.
* @param verticesToRestart containing task vertices to restart to recover
from the failure.
* {@code null} indicates that the failure is not restartable.
* @param restartDelayMS indicate a delay before conducting the restart
@@ -204,6 +227,7 @@ public class FailureHandlingResult {
@Nullable Execution failedExecution,
@Nullable Throwable cause,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
@@ -211,6 +235,7 @@ public class FailureHandlingResult {
failedExecution,
cause,
timestamp,
+ failureLabels,
verticesToRestart,
restartDelayMS,
globalFailure);
@@ -226,13 +251,17 @@ public class FailureHandlingResult {
* {@code null} as a value indicates that the failure was issued by
Flink itself.
* @param error reason why the failure is not recoverable
* @param timestamp The time of the failure.
+ * @param failureLabels Map of labels characterizing the failure produced
by the
+ * FailureEnrichers.
* @return result indicating the failure is not recoverable
*/
public static FailureHandlingResult unrecoverable(
@Nullable Execution failedExecution,
@Nonnull Throwable error,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
boolean globalFailure) {
- return new FailureHandlingResult(failedExecution, error, timestamp,
globalFailure);
+ return new FailureHandlingResult(
+ failedExecution, error, timestamp, failureLabels,
globalFailure);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
index e9a111405a7..cf05e0a40cb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
@@ -42,6 +42,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -49,6 +50,10 @@ import java.util.stream.Collectors;
public class FailureEnricherUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FailureEnricherUtils.class);
+
+ public static final CompletableFuture<Map<String, String>>
EMPTY_FAILURE_LABELS =
+ CompletableFuture.completedFuture(Collections.emptyMap());
+
// regex pattern to split the defined failure enrichers
private static final Pattern enricherListPattern =
Pattern.compile("\\s*,\\s*");
static final String MERGE_EXCEPTION_MSG =
@@ -170,12 +175,14 @@ public class FailureEnricherUtils {
*
* @param cause the Throwable to label
* @param context the context of the Throwable
+ * @param mainThreadExecutor the executor to complete the enricher
labeling on
* @param failureEnrichers a collection of FailureEnrichers to enrich the
context with
* @return a CompletableFuture that will complete with a map of labels
*/
public static CompletableFuture<Map<String, String>> labelFailure(
final Throwable cause,
final Context context,
+ final Executor mainThreadExecutor,
final Collection<FailureEnricher> failureEnrichers) {
// list of CompletableFutures to enrich failure with labels from each
enricher
final Collection<CompletableFuture<Map<String, String>>> enrichFutures
= new ArrayList<>();
@@ -204,7 +211,7 @@ public class FailureEnricherUtils {
}
// combine all CompletableFutures into a single CompletableFuture
containing a Map of labels
return FutureUtils.combineAll(enrichFutures)
- .thenApply(
+ .thenApplyAsync(
labelsToMerge -> {
final Map<String, String> mergedLabels = new
HashMap<>();
for (Map<String, String> labels : labelsToMerge) {
@@ -223,6 +230,7 @@ public class FailureEnricherUtils {
}));
}
return mergedLabels;
- });
+ },
+ mainThreadExecutor);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 8373c302c75..ec4146e7679 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -57,6 +58,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -114,6 +116,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
ComponentMainThreadExecutor mainThreadExecutor,
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener,
+ Collection<FailureEnricher> failureEnrichers,
BlocklistOperations blocklistOperations)
throws Exception {
return schedulerNGFactory.createInstance(
@@ -136,6 +139,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
mainThreadExecutor,
fatalErrorHandler,
jobStatusListener,
+ failureEnrichers,
blocklistOperations);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c40f13c685c..755129605d2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -206,6 +207,7 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
private final ExecutionDeploymentTracker executionDeploymentTracker;
private final ExecutionDeploymentReconciler executionDeploymentReconciler;
+ private final Collection<FailureEnricher> failureEnrichers;
// -------- Mutable fields ---------
@@ -243,6 +245,7 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
ExecutionDeploymentTracker executionDeploymentTracker,
ExecutionDeploymentReconciler.Factory
executionDeploymentReconcilerFactory,
BlocklistHandler.Factory blocklistHandlerFactory,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
@@ -345,6 +348,9 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
this.jobStatusListener = new JobManagerJobStatusListener();
+
+ this.failureEnrichers = checkNotNull(failureEnrichers);
+
this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
@@ -389,6 +395,7 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
getMainThreadExecutor(),
fatalErrorHandler,
jobStatusListener,
+ failureEnrichers,
blocklistHandler::addNewBlockedNodes);
return scheduler;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
index df8c40c86c4..02a610988da 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
+import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -87,6 +89,7 @@ public interface SlotPoolServiceSchedulerFactory {
ComponentMainThreadExecutor mainThreadExecutor,
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener,
+ Collection<FailureEnricher> failureEnrichers,
BlocklistOperations blocklistOperations)
throws Exception;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index ed5c7903634..c65c11bd2a7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.function.FunctionUtils;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -122,6 +123,7 @@ public class DefaultJobMasterServiceFactory implements
JobMasterServiceFactory {
DefaultExecutionDeploymentReconciler::new,
BlocklistUtils.loadBlocklistHandlerFactory(
jobMasterConfiguration.getConfiguration()),
+ Collections.emptySet(),
initializationTimestamp);
jobMaster.start();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 7127c0f9986..5fdde343a10 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -22,6 +22,8 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricher.Context;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -36,6 +38,7 @@ import
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHa
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.failure.DefaultFailureEnricherContext;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -126,6 +129,7 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
long initializationTimestamp,
final ComponentMainThreadExecutor mainThreadExecutor,
final JobStatusListener jobStatusListener,
+ final Collection<FailureEnricher> failureEnrichers,
final ExecutionGraphFactory executionGraphFactory,
final ShuffleMaster<?> shuffleMaster,
final Time rpcTimeout,
@@ -167,9 +171,32 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
jobGraph.getName(),
jobGraph.getJobID());
+ final Context taskFailureCtx =
+ DefaultFailureEnricherContext.forTaskFailure(
+ jobGraph.getJobID(),
+ jobGraph.getName(),
+ jobManagerJobMetricGroup,
+ ioExecutor,
+ userCodeLoader);
+
+ final Context globalFailureCtx =
+ DefaultFailureEnricherContext.forGlobalFailure(
+ jobGraph.getJobID(),
+ jobGraph.getName(),
+ jobManagerJobMetricGroup,
+ ioExecutor,
+ userCodeLoader);
+
this.executionFailureHandler =
new ExecutionFailureHandler(
- getSchedulingTopology(), failoverStrategy,
restartBackoffTimeStrategy);
+ getSchedulingTopology(),
+ failoverStrategy,
+ restartBackoffTimeStrategy,
+ mainThreadExecutor,
+ failureEnrichers,
+ taskFailureCtx,
+ globalFailureCtx);
+
this.schedulingStrategy =
schedulingStrategyFactory.createInstance(this,
getSchedulingTopology());
@@ -306,7 +333,10 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
if (failureHandlingResult.canRestart()) {
restartTasksWithDelay(failureHandlingResult);
} else {
- failJob(failureHandlingResult.getError(),
failureHandlingResult.getTimestamp());
+ failJob(
+ failureHandlingResult.getError(),
+ failureHandlingResult.getTimestamp(),
+ failureHandlingResult.getFailureLabels());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 315bb2911ff..25c623e1846 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -44,6 +45,7 @@ import
org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.slf4j.Logger;
+import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -74,6 +76,7 @@ public class DefaultSchedulerFactory implements
SchedulerNGFactory {
final ComponentMainThreadExecutor mainThreadExecutor,
final FatalErrorHandler fatalErrorHandler,
final JobStatusListener jobStatusListener,
+ final Collection<FailureEnricher> failureEnrichers,
final BlocklistOperations blocklistOperations)
throws Exception {
@@ -146,6 +149,7 @@ public class DefaultSchedulerFactory implements
SchedulerNGFactory {
}
jobStatusListener.jobStatusChanges(jobId, jobStatus,
timestamp);
},
+ failureEnrichers,
executionGraphFactory,
shuffleMaster,
rpcTimeout,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 63cd10f6fa5..86587a0b934 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -527,11 +527,12 @@ public abstract class SchedulerBase implements
SchedulerNG, CheckpointScheduling
return mainThreadExecutor;
}
- protected void failJob(Throwable cause, long timestamp) {
+ protected void failJob(
+ Throwable cause, long timestamp, CompletableFuture<Map<String,
String>> failureLabels) {
incrementVersionsOfAllVertices();
cancelAllPendingSlotRequestsInternal();
executionGraph.failJob(cause, timestamp);
- getJobTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));
+ getJobTerminationFuture().thenRun(() -> archiveGlobalFailure(cause,
failureLabels));
}
protected final SchedulingTopology getSchedulingTopology() {
@@ -677,19 +678,25 @@ public abstract class SchedulerBase implements
SchedulerNG, CheckpointScheduling
return executionGraph.getTerminationFuture();
}
- protected final void archiveGlobalFailure(Throwable failure) {
+ protected final void archiveGlobalFailure(
+ Throwable failure, CompletableFuture<Map<String, String>>
failureLabels) {
archiveGlobalFailure(
failure,
executionGraph.getStatusTimestamp(JobStatus.FAILED),
+ failureLabels,
StreamSupport.stream(executionGraph.getAllExecutionVertices().spliterator(),
false)
.map(ExecutionVertex::getCurrentExecutionAttempt)
.collect(Collectors.toSet()));
}
private void archiveGlobalFailure(
- Throwable failure, long timestamp, Iterable<Execution> executions)
{
+ Throwable failure,
+ long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
+ Iterable<Execution> executions) {
exceptionHistory.add(
- RootExceptionHistoryEntry.fromGlobalFailure(failure,
timestamp, executions));
+ RootExceptionHistoryEntry.fromGlobalFailure(
+ failure, timestamp, failureLabels, executions));
log.debug("Archive global failure.", failure);
}
@@ -712,6 +719,7 @@ public abstract class SchedulerBase implements SchedulerNG,
CheckpointScheduling
archiveGlobalFailure(
failureHandlingResult.getRootCause(),
failureHandlingResult.getTimestamp(),
+ failureHandlingResult.getFailureLabels(),
failureHandlingResult.getConcurrentlyFailedExecution());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index d3809b6b353..31b44c5aeca 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
+import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -63,6 +65,7 @@ public interface SchedulerNGFactory {
ComponentMainThreadExecutor mainThreadExecutor,
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener,
+ Collection<FailureEnricher> failureEnrichers,
BlocklistOperations blocklistOperations)
throws Exception;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 1d6d91d0476..f91605fd5fd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
import java.time.Duration;
+import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -83,6 +85,7 @@ public class AdaptiveSchedulerFactory implements
SchedulerNGFactory {
ComponentMainThreadExecutor mainThreadExecutor,
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener,
+ Collection<FailureEnricher> failureEnrichers,
BlocklistOperations blocklistOperations)
throws Exception {
final DeclarativeSlotPool declarativeSlotPool =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 953f36a2573..7789e167f0e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -359,7 +360,9 @@ abstract class StateWithExecutionGraph implements State {
final String taskName =
maybeTaskName.orElseThrow(NoSuchElementException::new);
final ExecutionState currentState = execution.getState();
if (currentState == desiredState) {
- failureCollection.add(ExceptionHistoryEntry.create(execution,
taskName));
+ failureCollection.add(
+ ExceptionHistoryEntry.create(
+ execution, taskName,
FailureEnricherUtils.EMPTY_FAILURE_LABELS));
onFailure(
ErrorInfo.handleMissingThrowable(
taskExecutionStateTransition.getError(userCodeClassLoader)));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index d6ddce977a4..afe64c27728 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
@@ -42,6 +43,7 @@ import
org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -70,6 +72,7 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -121,6 +124,7 @@ public class AdaptiveBatchScheduler extends
DefaultScheduler {
long initializationTimestamp,
final ComponentMainThreadExecutor mainThreadExecutor,
final JobStatusListener jobStatusListener,
+ final Collection<FailureEnricher> failureEnrichers,
final ExecutionGraphFactory executionGraphFactory,
final ShuffleMaster<?> shuffleMaster,
final Time rpcTimeout,
@@ -150,6 +154,7 @@ public class AdaptiveBatchScheduler extends
DefaultScheduler {
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
+ failureEnrichers,
executionGraphFactory,
shuffleMaster,
rpcTimeout,
@@ -295,7 +300,7 @@ public class AdaptiveBatchScheduler extends
DefaultScheduler {
}
} catch (JobException ex) {
log.error("Unexpected error occurred when initializing
ExecutionJobVertex", ex);
- failJob(ex, System.currentTimeMillis());
+ failJob(ex, System.currentTimeMillis(),
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
}
if (newlyInitializedJobVertices.size() > 0) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 3746b549ed1..8417b7b93ac 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.JobManagerOptions;
import
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -77,6 +78,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -113,6 +115,7 @@ public class AdaptiveBatchSchedulerFactory implements
SchedulerNGFactory {
ComponentMainThreadExecutor mainThreadExecutor,
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener,
+ Collection<FailureEnricher> failureEnrichers,
BlocklistOperations blocklistOperations)
throws Exception {
@@ -206,6 +209,7 @@ public class AdaptiveBatchSchedulerFactory implements
SchedulerNGFactory {
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
+ failureEnrichers,
executionGraphFactory,
shuffleMaster,
rpcTimeout,
@@ -237,6 +241,7 @@ public class AdaptiveBatchSchedulerFactory implements
SchedulerNGFactory {
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
+ failureEnrichers,
executionGraphFactory,
shuffleMaster,
rpcTimeout,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 4a5afad9007..e5c79a31076 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
@@ -120,6 +121,7 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
long initializationTimestamp,
final ComponentMainThreadExecutor mainThreadExecutor,
final JobStatusListener jobStatusListener,
+ final Collection<FailureEnricher> failureEnrichers,
final ExecutionGraphFactory executionGraphFactory,
final ShuffleMaster<?> shuffleMaster,
final Time rpcTimeout,
@@ -150,6 +152,7 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
+ failureEnrichers,
executionGraphFactory,
shuffleMaster,
rpcTimeout,
@@ -285,7 +288,10 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
archiveFromFailureHandlingResult(
createFailureHandlingResultSnapshot(failureHandlingResult));
} else {
- failJob(error, failureHandlingResult.getTimestamp());
+ failJob(
+ error,
+ failureHandlingResult.getTimestamp(),
+ failureHandlingResult.getFailureLabels());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
index 7d9429af0c6..4c393481b4d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
@@ -23,13 +23,18 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
import java.util.StringJoiner;
+import java.util.concurrent.CompletableFuture;
/**
* {@code ExceptionHistoryEntry} collects information about a single failure
that triggered the
@@ -41,18 +46,25 @@ public class ExceptionHistoryEntry extends ErrorInfo {
@Nullable private final String failingTaskName;
@Nullable private final ArchivedTaskManagerLocation taskManagerLocation;
+ private final transient CompletableFuture<Map<String, String>>
failureLabelsFuture;
+ /** Labels associated with the failure, set as soon as failureLabelsFuture
is completed. */
+ private Map<String, String> failureLabels;
/**
* Creates an {@code ExceptionHistoryEntry} based on the provided {@code
Execution}.
*
* @param failedExecution the failed {@code Execution}.
* @param taskName the name of the task.
+ * @param failureLabels the labels associated with the failure.
* @return The {@code ExceptionHistoryEntry}.
* @throws NullPointerException if {@code null} is passed as one of the
parameters.
* @throws IllegalArgumentException if the passed {@code Execution} does
not provide a {@link
* Execution#getFailureInfo() failureInfo}.
*/
- public static ExceptionHistoryEntry create(AccessExecution
failedExecution, String taskName) {
+ public static ExceptionHistoryEntry create(
+ AccessExecution failedExecution,
+ String taskName,
+ CompletableFuture<Map<String, String>> failureLabels) {
Preconditions.checkNotNull(failedExecution, "No Execution is
specified.");
Preconditions.checkNotNull(taskName, "No task name is specified.");
Preconditions.checkArgument(
@@ -63,6 +75,7 @@ public class ExceptionHistoryEntry extends ErrorInfo {
return new ExceptionHistoryEntry(
failure.getException(),
failure.getTimestamp(),
+ failureLabels,
taskName,
failedExecution.getAssignedResourceLocation());
}
@@ -70,7 +83,11 @@ public class ExceptionHistoryEntry extends ErrorInfo {
/** Creates an {@code ExceptionHistoryEntry} that is not based on an
{@code Execution}. */
public static ExceptionHistoryEntry createGlobal(Throwable cause) {
return new ExceptionHistoryEntry(
- cause, System.currentTimeMillis(), null,
(ArchivedTaskManagerLocation) null);
+ cause,
+ System.currentTimeMillis(),
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+ null,
+ (ArchivedTaskManagerLocation) null);
}
/**
@@ -78,6 +95,7 @@ public class ExceptionHistoryEntry extends ErrorInfo {
*
* @param cause The reason for the failure.
* @param timestamp The time the failure was caught.
+ * @param failureLabels The labels associated with the failure.
* @param failingTaskName The name of the task that failed.
* @param taskManagerLocation The host the task was running on.
* @throws NullPointerException if {@code cause} is {@code null}.
@@ -87,11 +105,13 @@ public class ExceptionHistoryEntry extends ErrorInfo {
protected ExceptionHistoryEntry(
Throwable cause,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
@Nullable String failingTaskName,
@Nullable TaskManagerLocation taskManagerLocation) {
this(
cause,
timestamp,
+ failureLabels,
failingTaskName,
ArchivedTaskManagerLocation.fromTaskManagerLocation(taskManagerLocation));
}
@@ -99,11 +119,17 @@ public class ExceptionHistoryEntry extends ErrorInfo {
private ExceptionHistoryEntry(
Throwable cause,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
@Nullable String failingTaskName,
@Nullable ArchivedTaskManagerLocation taskManagerLocation) {
super(cause, timestamp);
this.failingTaskName = failingTaskName;
this.taskManagerLocation = taskManagerLocation;
+ this.failureLabelsFuture =
+ Preconditions.checkNotNull(failureLabels)
+ .thenApply(
+ labelMap ->
+ this.failureLabels =
Collections.unmodifiableMap(labelMap));
}
public boolean isGlobal() {
@@ -120,6 +146,25 @@ public class ExceptionHistoryEntry extends ErrorInfo {
return taskManagerLocation;
}
+ /**
+ * Returns the labels associated with the failure that is set as soon as
failureLabelsFuture is
+ * completed. When failureLabelsFuture is not completed, it returns an
empty map.
+ *
+ * @return Map of failure labels
+ */
+ public Map<String, String> getFailureLabels() {
+ return
Optional.ofNullable(failureLabels).orElse(Collections.emptyMap());
+ }
+
+ /**
+ * Returns the labels future associated with the failure.
+ *
+ * @return CompletableFuture of Map failure labels
+ */
+ public CompletableFuture<Map<String, String>> getFailureLabelsFuture() {
+ return failureLabelsFuture;
+ }
+
/**
* {@code ArchivedTaskManagerLocation} represents a archived (static)
version of a {@link
* TaskManagerLocation}. It overcomes the issue with {@link
TaskManagerLocation#inetAddress}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
index 337509e26e8..9fdd21d8982 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
@@ -30,8 +30,10 @@ import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -43,6 +45,7 @@ public class FailureHandlingResultSnapshot {
@Nullable private final Execution rootCauseExecution;
private final Throwable rootCause;
+ private final CompletableFuture<Map<String, String>> failureLabels;
private final long timestamp;
private final Set<Execution> concurrentlyFailedExecutions;
@@ -80,6 +83,7 @@ public class FailureHandlingResultSnapshot {
rootCauseExecution,
ErrorInfo.handleMissingThrowable(failureHandlingResult.getError()),
failureHandlingResult.getTimestamp(),
+ failureHandlingResult.getFailureLabels(),
concurrentlyFailedExecutions);
}
@@ -88,6 +92,7 @@ public class FailureHandlingResultSnapshot {
@Nullable Execution rootCauseExecution,
Throwable rootCause,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
Set<Execution> concurrentlyFailedExecutions) {
Preconditions.checkArgument(
rootCauseExecution == null
@@ -95,6 +100,7 @@ public class FailureHandlingResultSnapshot {
"The rootCauseExecution should not be part of the
concurrentlyFailedExecutions map.");
this.rootCauseExecution = rootCauseExecution;
+ this.failureLabels = failureLabels;
this.rootCause = Preconditions.checkNotNull(rootCause);
this.timestamp = timestamp;
this.concurrentlyFailedExecutions =
@@ -120,6 +126,15 @@ public class FailureHandlingResultSnapshot {
return rootCause;
}
+ /**
+ * Returns the labels future associated with the failure.
+ *
+ * @return the CompletableFuture map of String labels
+ */
+ public CompletableFuture<Map<String, String>> getFailureLabels() {
+ return failureLabels;
+ }
+
/**
* The time the failure occurred.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
index 0ff9bd1a148..cfbad29716a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
@@ -21,12 +21,15 @@ package org.apache.flink.runtime.scheduler.exceptionhistory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -63,6 +66,7 @@ public class RootExceptionHistoryEntry extends
ExceptionHistoryEntry {
return createRootExceptionHistoryEntry(
snapshot.getRootCause(),
snapshot.getTimestamp(),
+ snapshot.getFailureLabels(),
failingTaskName,
taskManagerLocation,
snapshot.getConcurrentlyFailedExecution());
@@ -75,6 +79,7 @@ public class RootExceptionHistoryEntry extends
ExceptionHistoryEntry {
*
* @param cause The reason for the failure.
* @param timestamp The time the failure was caught.
+ * @param failureLabels Map of string labels associated with the failure.
* @param executions The {@link Execution} instances that shall be
analyzed for failures.
* @return The {@code RootExceptionHistoryEntry} instance.
* @throws NullPointerException if {@code failure} is {@code null}.
@@ -82,14 +87,23 @@ public class RootExceptionHistoryEntry extends
ExceptionHistoryEntry {
* 0}.
*/
public static RootExceptionHistoryEntry fromGlobalFailure(
- Throwable cause, long timestamp, Iterable<Execution> executions) {
- return createRootExceptionHistoryEntry(cause, timestamp, null, null,
executions);
+ Throwable cause,
+ long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
+ Iterable<Execution> executions) {
+ return createRootExceptionHistoryEntry(
+ cause, timestamp, failureLabels, null, null, executions);
}
public static RootExceptionHistoryEntry fromExceptionHistoryEntry(
ExceptionHistoryEntry entry, Iterable<ExceptionHistoryEntry>
entries) {
return new RootExceptionHistoryEntry(
- entry.getException(), entry.getTimestamp(), null, null,
entries);
+ entry.getException(),
+ entry.getTimestamp(),
+ entry.getFailureLabelsFuture(),
+ null,
+ null,
+ entries);
}
/**
@@ -107,18 +121,23 @@ public class RootExceptionHistoryEntry extends
ExceptionHistoryEntry {
public static RootExceptionHistoryEntry fromGlobalFailure(ErrorInfo
errorInfo) {
Preconditions.checkNotNull(errorInfo, "errorInfo");
return fromGlobalFailure(
- errorInfo.getException(), errorInfo.getTimestamp(),
Collections.emptyList());
+ errorInfo.getException(),
+ errorInfo.getTimestamp(),
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+ Collections.emptyList());
}
private static RootExceptionHistoryEntry createRootExceptionHistoryEntry(
Throwable cause,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
@Nullable String failingTaskName,
@Nullable TaskManagerLocation taskManagerLocation,
Iterable<Execution> executions) {
return new RootExceptionHistoryEntry(
cause,
timestamp,
+ failureLabels,
failingTaskName,
taskManagerLocation,
StreamSupport.stream(executions.spliterator(), false)
@@ -126,7 +145,9 @@ public class RootExceptionHistoryEntry extends
ExceptionHistoryEntry {
.map(
execution ->
ExceptionHistoryEntry.create(
- execution,
execution.getVertexWithAttempt()))
+ execution,
+
execution.getVertexWithAttempt(),
+
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
.collect(Collectors.toList()));
}
@@ -135,6 +156,7 @@ public class RootExceptionHistoryEntry extends
ExceptionHistoryEntry {
*
* @param cause The reason for the failure.
* @param timestamp The time the failure was caught.
+ * @param failureLabels labels associated with the failure.
* @param failingTaskName The name of the task that failed.
* @param taskManagerLocation The host the task was running on.
* @throws NullPointerException if {@code cause} is {@code null}.
@@ -145,10 +167,11 @@ public class RootExceptionHistoryEntry extends
ExceptionHistoryEntry {
public RootExceptionHistoryEntry(
Throwable cause,
long timestamp,
+ CompletableFuture<Map<String, String>> failureLabels,
@Nullable String failingTaskName,
@Nullable TaskManagerLocation taskManagerLocation,
Iterable<ExceptionHistoryEntry> concurrentExceptions) {
- super(cause, timestamp, failingTaskName, taskManagerLocation);
+ super(cause, timestamp, failureLabels, failingTaskName,
taskManagerLocation);
this.concurrentExceptions = concurrentExceptions;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
index 2c4406fa263..c3df9922c5b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;
+import org.apache.flink.core.failure.TestingFailureEnricher;
+import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -35,6 +37,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collections;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
@@ -59,6 +62,8 @@ class ExecutionFailureHandlerTest {
private ExecutionFailureHandler executionFailureHandler;
+ private TestingFailureEnricher testingFailureEnricher;
+
@BeforeEach
void setUp() {
TestingSchedulingTopology topology = new TestingSchedulingTopology();
@@ -66,10 +71,17 @@ class ExecutionFailureHandlerTest {
schedulingTopology = topology;
failoverStrategy = new TestFailoverStrategy();
+ testingFailureEnricher = new TestingFailureEnricher();
backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true,
RESTART_DELAY_MS);
executionFailureHandler =
new ExecutionFailureHandler(
- schedulingTopology, failoverStrategy,
backoffTimeStrategy);
+ schedulingTopology,
+ failoverStrategy,
+ backoffTimeStrategy,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ Collections.singleton(testingFailureEnricher),
+ null,
+ null);
}
/** Tests the case that task restarting is accepted. */
@@ -94,6 +106,9 @@ class ExecutionFailureHandlerTest {
assertThat(result.getVerticesToRestart()).isEqualTo(tasksToRestart);
assertThat(result.getError()).isSameAs(cause);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
+
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(cause);
+ assertThat(result.getFailureLabels().get())
+ .isEqualTo(testingFailureEnricher.getFailureLabels());
assertThat(executionFailureHandler.getNumberOfRestarts()).isEqualTo(1);
}
@@ -116,6 +131,9 @@ class ExecutionFailureHandlerTest {
assertThat(result.getFailedExecution().get()).isSameAs(execution);
assertThat(result.getError()).hasCause(error);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
+
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
+ assertThat(result.getFailureLabels().get())
+ .isEqualTo(testingFailureEnricher.getFailureLabels());
assertThat(ExecutionFailureHandler.isUnrecoverableError(result.getError())).isFalse();
assertThatThrownBy(result::getVerticesToRestart)
@@ -147,6 +165,9 @@ class ExecutionFailureHandlerTest {
assertThat(result.getFailedExecution().get()).isSameAs(execution);
assertThat(result.getError()).isNotNull();
assertThat(ExecutionFailureHandler.isUnrecoverableError(result.getError())).isTrue();
+
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
+ assertThat(result.getFailureLabels().get())
+ .isEqualTo(testingFailureEnricher.getFailureLabels());
assertThat(result.getTimestamp()).isEqualTo(timestamp);
assertThatThrownBy(result::getVerticesToRestart)
@@ -180,7 +201,7 @@ class ExecutionFailureHandlerTest {
}
@Test
- void testGlobalFailureHandling() {
+ void testGlobalFailureHandling() throws ExecutionException,
InterruptedException {
final Throwable error = new Exception("Expected test failure");
final long timestamp = System.currentTimeMillis();
final FailureHandlingResult result =
@@ -193,6 +214,9 @@ class ExecutionFailureHandlerTest {
.collect(Collectors.toSet()));
assertThat(result.getError()).isSameAs(error);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
+
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
+ assertThat(result.getFailureLabels().get())
+ .isEqualTo(testingFailureEnricher.getFailureLabels());
}
// ------------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
index a5df025989b..064aac9147e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
@@ -27,8 +27,11 @@ import
org.apache.flink.testutils.executor.TestExecutorExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionGraph;
@@ -53,15 +56,19 @@ class FailureHandlingResultTest {
Set<ExecutionVertexID> tasks = new HashSet<>();
tasks.add(execution.getVertex().getID());
- long delay = 1234;
- Throwable error = new RuntimeException();
- long timestamp = System.currentTimeMillis();
+ final long delay = 1234;
+ final Throwable error = new RuntimeException();
+ final long timestamp = System.currentTimeMillis();
+ final CompletableFuture<Map<String, String>> failureLabels =
+
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
FailureHandlingResult result =
- FailureHandlingResult.restartable(execution, error, timestamp,
tasks, delay, false);
+ FailureHandlingResult.restartable(
+ execution, error, timestamp, failureLabels, tasks,
delay, false);
assertThat(result.canRestart()).isTrue();
assertThat(delay).isEqualTo(result.getRestartDelayMS());
assertThat(tasks).isEqualTo(result.getVerticesToRestart());
+ assertThat(result.getFailureLabels()).isEqualTo(failureLabels);
assertThat(result.getError()).isSameAs(error);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
assertThat(result.getFailedExecution()).isPresent();
@@ -72,14 +79,17 @@ class FailureHandlingResultTest {
@Test
void
testRestartingSuppressedFailureHandlingResultWithNoCausingExecutionVertexId() {
// create a FailureHandlingResult with error
- Throwable error = new Exception("test error");
- long timestamp = System.currentTimeMillis();
+ final Throwable error = new Exception("test error");
+ final long timestamp = System.currentTimeMillis();
+ final CompletableFuture<Map<String, String>> failureLabels =
+
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
FailureHandlingResult result =
- FailureHandlingResult.unrecoverable(null, error, timestamp,
false);
+ FailureHandlingResult.unrecoverable(null, error, timestamp,
failureLabels, false);
assertThat(result.canRestart()).isFalse();
assertThat(result.getError()).isSameAs(error);
assertThat(result.getTimestamp()).isEqualTo(timestamp);
+ assertThat(result.getFailureLabels()).isEqualTo(failureLabels);
assertThat(result.getFailedExecution()).isNotPresent();
assertThatThrownBy(result::getVerticesToRestart)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
index d86b0e06e37..8eedf2d0be2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.FailureEnricherFactory;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.TestingPluginManager;
+import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.commons.collections.IteratorUtils;
@@ -163,7 +164,11 @@ class FailureEnricherUtilsTest {
failureEnrichers.add(validEnricher);
final CompletableFuture<Map<String, String>> result =
- FailureEnricherUtils.labelFailure(cause, null,
failureEnrichers);
+ FailureEnricherUtils.labelFailure(
+ cause,
+ null,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ failureEnrichers);
assertThatFuture(result)
.eventuallySucceeds()
@@ -188,7 +193,11 @@ class FailureEnricherUtilsTest {
failureEnrichers.add(invalidEnricher);
final CompletableFuture<Map<String, String>> result =
- FailureEnricherUtils.labelFailure(cause, null,
failureEnrichers);
+ FailureEnricherUtils.labelFailure(
+ cause,
+ null,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ failureEnrichers);
// Ignoring labels
assertThatFuture(result).eventuallySucceeds().satisfies(labels ->
labels.isEmpty());
}
@@ -208,7 +217,11 @@ class FailureEnricherUtilsTest {
};
final CompletableFuture<Map<String, String>> result =
- FailureEnricherUtils.labelFailure(cause, null, enrichers);
+ FailureEnricherUtils.labelFailure(
+ cause,
+ null,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ enrichers);
try {
result.get();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index 14bc3760792..f5dd1e649da 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -46,6 +47,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
+import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -121,6 +123,7 @@ public class JobMasterSchedulerTest extends TestLogger {
ComponentMainThreadExecutor mainThreadExecutor,
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener,
+ Collection<FailureEnricher> failureEnrichers,
BlocklistOperations blocklistOperations) {
return TestingSchedulerNG.newBuilder()
.setStartSchedulingRunnable(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 5c558449143..a6d30b41ff8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster.utils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -49,6 +50,8 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
/** A builder for the {@link JobMaster}. */
@@ -84,6 +87,8 @@ public class JobMasterBuilder {
private FatalErrorHandler fatalErrorHandler = error -> {};
+ private Collection<FailureEnricher> failureEnrichers =
Collections.emptySet();
+
private ExecutionDeploymentTracker executionDeploymentTracker =
new DefaultExecutionDeploymentTracker();
private ExecutionDeploymentReconciler.Factory
executionDeploymentReconcilerFactory =
@@ -211,6 +216,7 @@ public class JobMasterBuilder {
executionDeploymentTracker,
executionDeploymentReconcilerFactory,
blocklistHandlerFactory,
+ failureEnrichers,
System.currentTimeMillis());
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index 7fe27b6c2f4..021d13314b3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -28,6 +28,7 @@ import
org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionHistory;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
@@ -67,6 +68,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import static
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -110,7 +112,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
}
@Test
- public void testOnlyRootCause() throws HandlerRequestException {
+ public void testOnlyRootCause()
+ throws HandlerRequestException, ExecutionException,
InterruptedException {
final Throwable rootCause = new RuntimeException("root cause");
final long rootCauseTimestamp = System.currentTimeMillis();
@@ -132,7 +135,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
}
@Test
- public void testOnlyExceptionHistory() throws HandlerRequestException {
+ public void testOnlyExceptionHistory()
+ throws HandlerRequestException, ExecutionException,
InterruptedException {
final RuntimeException rootThrowable = new RuntimeException("exception
#0");
final long rootTimestamp = System.currentTimeMillis();
final RootExceptionHistoryEntry rootEntry =
fromGlobalFailure(rootThrowable, rootTimestamp);
@@ -152,13 +156,15 @@ public class JobExceptionsHandlerTest extends TestLogger {
}
@Test
- public void testWithExceptionHistory() throws HandlerRequestException {
+ public void testWithExceptionHistory()
+ throws HandlerRequestException, ExecutionException,
InterruptedException {
final RootExceptionHistoryEntry rootCause =
fromGlobalFailure(new RuntimeException("exception #0"),
System.currentTimeMillis());
final RootExceptionHistoryEntry otherFailure =
new RootExceptionHistoryEntry(
new RuntimeException("exception #1"),
System.currentTimeMillis(),
+
CompletableFuture.completedFuture(Collections.singletonMap("key", "value")),
"task name",
new LocalTaskManagerLocation(),
Collections.emptySet());
@@ -178,6 +184,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
historyContainsJobExceptionInfo(
otherFailure.getException(),
otherFailure.getTimestamp(),
+ otherFailure.getFailureLabelsFuture(),
otherFailure.getFailingTaskName(),
JobExceptionsHandler.toString(
otherFailure.getTaskManagerLocation()),
@@ -188,11 +195,12 @@ public class JobExceptionsHandlerTest extends TestLogger {
@Test
public void
testWithLocalExceptionHistoryEntryNotHavingATaskManagerInformationAvailable()
- throws HandlerRequestException {
+ throws HandlerRequestException, ExecutionException,
InterruptedException {
final RootExceptionHistoryEntry failure =
new RootExceptionHistoryEntry(
new RuntimeException("exception #1"),
System.currentTimeMillis(),
+
CompletableFuture.completedFuture(Collections.singletonMap("key", "value")),
"task name",
null,
Collections.emptySet());
@@ -209,6 +217,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
historyContainsJobExceptionInfo(
failure.getException(),
failure.getTimestamp(),
+ failure.getFailureLabelsFuture(),
failure.getFailingTaskName(),
JobExceptionsHandler.toString(failure.getTaskManagerLocation()),
JobExceptionsHandler.toTaskManagerId(
@@ -217,13 +226,14 @@ public class JobExceptionsHandlerTest extends TestLogger {
@Test
public void testWithExceptionHistoryWithTruncationThroughParameter()
- throws HandlerRequestException {
+ throws HandlerRequestException, ExecutionException,
InterruptedException {
final RootExceptionHistoryEntry rootCause =
fromGlobalFailure(new RuntimeException("exception #0"),
System.currentTimeMillis());
final RootExceptionHistoryEntry otherFailure =
new RootExceptionHistoryEntry(
new RuntimeException("exception #1"),
System.currentTimeMillis(),
+
CompletableFuture.completedFuture(Collections.singletonMap("key", "value")),
"task name",
new LocalTaskManagerLocation(),
Collections.emptySet());
@@ -318,6 +328,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
new RootExceptionHistoryEntry(
failureCause,
failureTimestamp,
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
"test task #1",
new LocalTaskManagerLocation(),
Collections.emptySet()));
@@ -421,7 +432,13 @@ public class JobExceptionsHandlerTest extends TestLogger {
}
private static RootExceptionHistoryEntry fromGlobalFailure(Throwable
cause, long timestamp) {
- return new RootExceptionHistoryEntry(cause, timestamp, null, null,
Collections.emptySet());
+ return new RootExceptionHistoryEntry(
+ cause,
+ timestamp,
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+ null,
+ null,
+ Collections.emptySet());
}
// -------- factory methods for instantiating new Matchers --------
@@ -430,14 +447,17 @@ public class JobExceptionsHandlerTest extends TestLogger {
private static Matcher<RootExceptionInfo> historyContainsJobExceptionInfo(
Throwable expectedFailureCause,
long expectedFailureTimestamp,
+ CompletableFuture<Map<String, String>> expectedFailureLabels,
String expectedTaskNameWithSubtaskId,
String expectedTaskManagerLocation,
String expectedTaskManagerId,
- Matcher<ExceptionInfo>... concurrentExceptionMatchers) {
+ Matcher<ExceptionInfo>... concurrentExceptionMatchers)
+ throws ExecutionException, InterruptedException {
return new RootExceptionInfoMatcher(
matchesFailure(
expectedFailureCause,
expectedFailureTimestamp,
+ expectedFailureLabels,
expectedTaskNameWithSubtaskId,
expectedTaskManagerLocation,
expectedTaskManagerId),
@@ -448,21 +468,31 @@ public class JobExceptionsHandlerTest extends TestLogger {
private static Matcher<RootExceptionInfo> historyContainsGlobalFailure(
Throwable expectedFailureCause,
long expectedFailureTimestamp,
- Matcher<ExceptionInfo>... concurrentExceptionMatchers) {
+ Matcher<ExceptionInfo>... concurrentExceptionMatchers)
+ throws ExecutionException, InterruptedException {
return new RootExceptionInfoMatcher(
- matchesFailure(expectedFailureCause, expectedFailureTimestamp,
null, null, null),
+ matchesFailure(
+ expectedFailureCause,
+ expectedFailureTimestamp,
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+ null,
+ null,
+ null),
concurrentExceptionMatchers);
}
private static Matcher<ExceptionInfo> matchesFailure(
Throwable expectedFailureCause,
long expectedFailureTimestamp,
+ CompletableFuture<Map<String, String>> expectedFailureLabels,
String expectedTaskNameWithSubtaskId,
String expectedTaskManagerLocation,
- String expectedTaskManagerId) {
+ String expectedTaskManagerId)
+ throws ExecutionException, InterruptedException {
return new ExceptionInfoMatcher(
expectedFailureCause,
expectedFailureTimestamp,
+ expectedFailureLabels,
expectedTaskNameWithSubtaskId,
expectedTaskManagerLocation,
expectedTaskManagerId);
@@ -519,6 +549,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
private final Throwable expectedException;
private final long expectedTimestamp;
+ private final Map<String, String> expectedFailureLabels;
private final String expectedTaskName;
private final String expectedLocation;
private final String expectedTaskManagerId;
@@ -526,11 +557,14 @@ public class JobExceptionsHandlerTest extends TestLogger {
private ExceptionInfoMatcher(
Throwable expectedException,
long expectedTimestamp,
+ CompletableFuture<Map<String, String>> expectedFailureLabels,
String expectedTaskName,
String expectedLocation,
- String expectedTaskManagerId) {
+ String expectedTaskManagerId)
+ throws ExecutionException, InterruptedException {
this.expectedException =
deserializeSerializedThrowable(expectedException);
this.expectedTimestamp = expectedTimestamp;
+ this.expectedFailureLabels = expectedFailureLabels.get();
this.expectedTaskName = expectedTaskName;
this.expectedLocation = expectedLocation;
this.expectedTaskManagerId = expectedTaskManagerId;
@@ -545,6 +579,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
.appendText(getExpectedStacktrace())
.appendText(", timestamp=")
.appendText(String.valueOf(expectedTimestamp))
+ .appendText(", failureLabels=")
+ .appendText(String.valueOf(expectedFailureLabels))
.appendText(", taskName=")
.appendText(expectedTaskName)
.appendText(", location=")
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
index 56c7b29ebd3..38df9476854 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
@@ -62,7 +63,9 @@ import
org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
@@ -102,6 +105,7 @@ public class DefaultSchedulerBuilder {
private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
new TestExecutionSlotAllocatorFactory();
private JobStatusListener jobStatusListener = (ignoredA, ignoredB,
ignoredC) -> {};
+ private Collection<FailureEnricher> failureEnrichers = new HashSet<>();
private ExecutionDeployer.Factory executionDeployerFactory =
new DefaultExecutionDeployer.Factory();
private VertexParallelismAndInputInfosDecider
vertexParallelismAndInputInfosDecider =
@@ -246,6 +250,12 @@ public class DefaultSchedulerBuilder {
return this;
}
+ public DefaultSchedulerBuilder setFailureEnrichers(
+ Collection<FailureEnricher> failureEnrichers) {
+ this.failureEnrichers = failureEnrichers;
+ return this;
+ }
+
public DefaultSchedulerBuilder setExecutionDeployerFactory(
ExecutionDeployer.Factory executionDeployerFactory) {
this.executionDeployerFactory = executionDeployerFactory;
@@ -301,6 +311,7 @@ public class DefaultSchedulerBuilder {
System.currentTimeMillis(),
mainThreadExecutor,
jobStatusListener,
+ failureEnrichers,
createExecutionGraphFactory(false),
shuffleMaster,
rpcTimeout,
@@ -329,6 +340,7 @@ public class DefaultSchedulerBuilder {
System.currentTimeMillis(),
mainThreadExecutor,
jobStatusListener,
+ failureEnrichers,
createExecutionGraphFactory(true),
shuffleMaster,
rpcTimeout,
@@ -360,6 +372,7 @@ public class DefaultSchedulerBuilder {
System.currentTimeMillis(),
mainThreadExecutor,
jobStatusListener,
+ failureEnrichers,
createExecutionGraphFactory(true, new
SpeculativeExecutionJobVertex.Factory()),
shuffleMaster,
rpcTimeout,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index bc6df6ee0a8..5e60f34cdd8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.metrics.Gauge;
@@ -57,6 +59,7 @@ import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRe
import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import
org.apache.flink.runtime.executiongraph.utils.TestFailoverStrategyFactory;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import
org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
@@ -111,6 +114,7 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
@@ -969,7 +973,10 @@ public class DefaultSchedulerTest extends TestLogger {
final ExecutionVertexVersion executionVertexVersion =
executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
- scheduler.failJob(new FlinkException("Test failure."),
System.currentTimeMillis());
+ scheduler.failJob(
+ new FlinkException("Test failure."),
+ System.currentTimeMillis(),
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS);
assertThat(executionVertexVersioner.isModified(executionVertexVersion)).isTrue();
}
@@ -1297,6 +1304,73 @@ public class DefaultSchedulerTest extends TestLogger {
.isTrue();
}
+ /** Verify DefaultScheduler propagates Task failure labels as generated by
Failure Enrichers. */
+ @Test
+ void testTaskFailureWithFailureEnricherLabels() {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+ final TestingFailureEnricher testingFailureEnricher = new
TestingFailureEnricher();
+ final DefaultScheduler scheduler =
+ createSchedulerAndStartScheduling(
+ jobGraph,
Collections.singleton(testingFailureEnricher));
+
+ final ExecutionAttemptID firstAttempt =
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final RuntimeException firstException = new RuntimeException("First
exception");
+ final long firstFailTimestamp = initiateFailure(scheduler,
firstAttempt, firstException);
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ final ArchivedExecutionVertex executionVertex =
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
+
+ // Make sure FailureEnricher is triggered
+ assertThat(testingFailureEnricher.getSeenThrowables().stream().map(t
-> t.getMessage()))
+ .contains(firstException.getMessage());
+ // And failure labels are part of ExceptionHistory
+ assertThat(scheduler.getExceptionHistory())
+ .map(entry -> entry.getFailureLabelsFuture().get())
+ .contains(testingFailureEnricher.getFailureLabels());
+
+ assertThat(scheduler.getExceptionHistory())
+ .anySatisfy(
+ e ->
+
ExceptionHistoryEntryTestingUtils.matchesFailure(
+ e,
+ firstException,
+ firstFailTimestamp,
+
testingFailureEnricher.getFailureLabels()));
+
+ final RuntimeException anotherException = new
RuntimeException("Another exception");
+ final long anotherFailTimestamp =
+ initiateFailure(
+ scheduler,
+
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ anotherException);
+
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ assertThat(testingFailureEnricher.getSeenThrowables().stream().map(t
-> t.getMessage()))
+ .contains(anotherException.getMessage());
+
+ assertThat(scheduler.getExceptionHistory())
+ .anySatisfy(
+ e ->
+
ExceptionHistoryEntryTestingUtils.matchesFailure(
+ e,
+ anotherException,
+ anotherFailTimestamp,
+
testingFailureEnricher.getFailureLabels()));
+ }
+
@Test
void testExceptionHistoryWithPreDeployFailure() {
// disable auto-completing slot requests to simulate timeout
@@ -1934,6 +2008,14 @@ public class DefaultSchedulerTest extends TestLogger {
return sortedVertices.get(0);
}
+ private DefaultScheduler createSchedulerAndStartScheduling(
+ final JobGraph jobGraph, final Collection<FailureEnricher>
failureEnrichers) {
+ return createSchedulerAndStartScheduling(
+ jobGraph,
+ ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ failureEnrichers);
+ }
+
private DefaultScheduler createSchedulerAndStartScheduling(final JobGraph
jobGraph) {
return createSchedulerAndStartScheduling(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread());
@@ -1941,10 +2023,18 @@ public class DefaultSchedulerTest extends TestLogger {
private DefaultScheduler createSchedulerAndStartScheduling(
final JobGraph jobGraph, final ComponentMainThreadExecutor
mainThreadExecutor) {
+ return createSchedulerAndStartScheduling(
+ jobGraph, mainThreadExecutor, Collections.emptySet());
+ }
+
+ private DefaultScheduler createSchedulerAndStartScheduling(
+ final JobGraph jobGraph,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final Collection<FailureEnricher> failureEnrichers) {
try {
final DefaultScheduler scheduler =
- createSchedulerBuilder(jobGraph,
mainThreadExecutor).build();
+ createSchedulerBuilder(jobGraph, mainThreadExecutor,
failureEnrichers).build();
mainThreadExecutor.execute(scheduler::startScheduling);
return scheduler;
} catch (Exception e) {
@@ -1957,7 +2047,7 @@ public class DefaultSchedulerTest extends TestLogger {
final ComponentMainThreadExecutor mainThreadExecutor,
final SchedulingStrategyFactory schedulingStrategyFactory)
throws Exception {
- return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+ return createSchedulerBuilder(jobGraph, mainThreadExecutor,
Collections.emptySet())
.setSchedulingStrategyFactory(schedulingStrategyFactory)
.build();
}
@@ -1968,7 +2058,7 @@ public class DefaultSchedulerTest extends TestLogger {
final SchedulingStrategyFactory schedulingStrategyFactory,
final FailoverStrategy.Factory failoverStrategyFactory)
throws Exception {
- return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+ return createSchedulerBuilder(jobGraph, mainThreadExecutor,
Collections.emptySet())
.setSchedulingStrategyFactory(schedulingStrategyFactory)
.setFailoverStrategyFactory(failoverStrategyFactory)
.build();
@@ -1981,7 +2071,7 @@ public class DefaultSchedulerTest extends TestLogger {
final FailoverStrategy.Factory failoverStrategyFactory,
final ScheduledExecutor delayExecutor)
throws Exception {
- return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+ return createSchedulerBuilder(jobGraph, mainThreadExecutor,
Collections.emptySet())
.setDelayExecutor(delayExecutor)
.setSchedulingStrategyFactory(schedulingStrategyFactory)
.setFailoverStrategyFactory(failoverStrategyFactory)
@@ -1990,6 +2080,13 @@ public class DefaultSchedulerTest extends TestLogger {
private DefaultSchedulerBuilder createSchedulerBuilder(
final JobGraph jobGraph, final ComponentMainThreadExecutor
mainThreadExecutor) {
+ return createSchedulerBuilder(jobGraph, mainThreadExecutor,
Collections.emptySet());
+ }
+
+ private DefaultSchedulerBuilder createSchedulerBuilder(
+ final JobGraph jobGraph,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final Collection<FailureEnricher> failureEnrichers) {
return new DefaultSchedulerBuilder(
jobGraph,
mainThreadExecutor,
@@ -2004,6 +2101,7 @@ public class DefaultSchedulerTest extends TestLogger {
.setExecutionOperations(testExecutionOperations)
.setExecutionVertexVersioner(executionVertexVersioner)
.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
+ .setFailureEnrichers(failureEnrichers)
.setShuffleMaster(shuffleMaster)
.setPartitionTracker(partitionTracker)
.setRpcTimeout(timeout);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index 53b31232a97..7108ac3b4aa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
+import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -71,6 +73,7 @@ public class TestingSchedulerNGFactory implements
SchedulerNGFactory {
ComponentMainThreadExecutor mainThreadExecutor,
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener,
+ Collection<FailureEnricher> failureEnrichers,
BlocklistOperations blocklistOperations)
throws Exception {
return schedulerNG;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java
index d7665057d05..66fd9e3afd7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java
@@ -25,12 +25,18 @@ import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
+import java.util.Collections;
+import java.util.Map;
+
/** Matches {@link ExceptionHistoryEntry} instances. */
public class ExceptionHistoryEntryMatcher extends
TypeSafeDiagnosingMatcher<ExceptionHistoryEntry> {
public static Matcher<ExceptionHistoryEntry> matchesGlobalFailure(
- Throwable expectedException, long expectedTimestamp) {
- return matchesFailure(expectedException, expectedTimestamp, null,
null);
+ Throwable expectedException,
+ long expectedTimestamp,
+ Map<String, String> expectedFailureLabels) {
+ return matchesFailure(
+ expectedException, expectedTimestamp, expectedFailureLabels,
null, null);
}
public static Matcher<ExceptionHistoryEntry> matchesFailure(
@@ -41,22 +47,40 @@ public class ExceptionHistoryEntryMatcher extends
TypeSafeDiagnosingMatcher<Exce
return new ExceptionHistoryEntryMatcher(
expectedException,
expectedTimestamp,
+ Collections.emptyMap(),
+ expectedTaskName,
+ expectedTaskManagerLocation);
+ }
+
+ public static Matcher<ExceptionHistoryEntry> matchesFailure(
+ Throwable expectedException,
+ long expectedTimestamp,
+ Map<String, String> expectedFailureLabels,
+ String expectedTaskName,
+ TaskManagerLocation expectedTaskManagerLocation) {
+ return new ExceptionHistoryEntryMatcher(
+ expectedException,
+ expectedTimestamp,
+ expectedFailureLabels,
expectedTaskName,
expectedTaskManagerLocation);
}
private final Throwable expectedException;
private final long expectedTimestamp;
+ private final Map<String, String> expectedFailureLabels;
private final String expectedTaskName;
private final ArchivedTaskManagerLocationMatcher
taskManagerLocationMatcher;
public ExceptionHistoryEntryMatcher(
Throwable expectedException,
long expectedTimestamp,
+ Map<String, String> expectedFailureLabels,
String expectedTaskName,
TaskManagerLocation expectedTaskManagerLocation) {
this.expectedException = expectedException;
this.expectedTimestamp = expectedTimestamp;
+ this.expectedFailureLabels = expectedFailureLabels;
this.expectedTaskName = expectedTaskName;
this.taskManagerLocationMatcher =
new
ArchivedTaskManagerLocationMatcher(expectedTaskManagerLocation);
@@ -87,6 +111,13 @@ public class ExceptionHistoryEntryMatcher extends
TypeSafeDiagnosingMatcher<Exce
match = false;
}
+ if
(!exceptionHistoryEntry.getFailureLabelsFuture().equals(expectedFailureLabels))
{
+ description
+ .appendText(" actualFailureLabels=")
+
.appendText(String.valueOf(exceptionHistoryEntry.getFailureLabelsFuture()));
+ match = false;
+ }
+
if (exceptionHistoryEntry.getFailingTaskName() == null) {
if (expectedTaskName != null) {
description.appendText(" actualTaskName=null");
@@ -113,6 +144,8 @@ public class ExceptionHistoryEntryMatcher extends
TypeSafeDiagnosingMatcher<Exce
.appendText(ExceptionUtils.stringifyException(expectedException))
.appendText(" expectedTimestamp=")
.appendText(String.valueOf(expectedTimestamp))
+ .appendText(" expectedFailureLabels=")
+ .appendText(String.valueOf(expectedFailureLabels))
.appendText(" expectedTaskName=")
.appendText(expectedTaskName)
.appendText(" expectedTaskManagerLocation=");
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
index ff49166a5a2..ac36d14f1c5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
@@ -20,12 +20,17 @@ package org.apache.flink.runtime.scheduler.exceptionhistory;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
import static
org.apache.flink.runtime.scheduler.exceptionhistory.ArchivedTaskManagerLocationMatcher.isArchivedTaskManagerLocation;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -45,8 +50,11 @@ public class ExceptionHistoryEntryTest extends TestLogger {
.withTaskManagerLocation(taskManagerLocation)
.build();
final String taskName = "task name";
+ final Map<String, String> failureLabels =
Collections.singletonMap("key", "value");
- final ExceptionHistoryEntry entry =
ExceptionHistoryEntry.create(execution, taskName);
+ final ExceptionHistoryEntry entry =
+ ExceptionHistoryEntry.create(
+ execution, taskName,
CompletableFuture.completedFuture(failureLabels));
assertThat(
entry.getException().deserializeError(ClassLoader.getSystemClassLoader()),
@@ -56,6 +64,7 @@ public class ExceptionHistoryEntryTest extends TestLogger {
assertThat(
entry.getTaskManagerLocation(),
isArchivedTaskManagerLocation(taskManagerLocation));
assertThat(entry.isGlobal(), is(false));
+ assertThat(entry.getFailureLabels(), is(failureLabels));
}
@Test(expected = IllegalArgumentException.class)
@@ -64,12 +73,13 @@ public class ExceptionHistoryEntryTest extends TestLogger {
TestingAccessExecution.newBuilder()
.withTaskManagerLocation(new
LocalTaskManagerLocation())
.build(),
- "task name");
+ "task name",
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS);
}
@Test(expected = NullPointerException.class)
public void testNullExecution() {
- ExceptionHistoryEntry.create(null, "task name");
+ ExceptionHistoryEntry.create(null, "task name",
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
}
@Test(expected = NullPointerException.class)
@@ -82,7 +92,8 @@ public class ExceptionHistoryEntryTest extends TestLogger {
System.currentTimeMillis()))
.withTaskManagerLocation(new
LocalTaskManagerLocation())
.build(),
- null);
+ null,
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS);
}
@Test
@@ -97,7 +108,8 @@ public class ExceptionHistoryEntryTest extends TestLogger {
.withTaskManagerLocation(null)
.withErrorInfo(new ErrorInfo(failure,
timestamp))
.build(),
- taskName);
+ taskName,
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS);
assertThat(
entry.getException().deserializeError(ClassLoader.getSystemClassLoader()),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
index 08fad7e96b9..7e1cb14d0a6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.scheduler.exceptionhistory;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import java.util.Collections;
+import java.util.Map;
import java.util.Objects;
import static
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation;
@@ -36,6 +38,34 @@ public class ExceptionHistoryEntryTestingUtils {
}
public static boolean matchesFailure(
+ ExceptionHistoryEntry exceptionHistoryEntry,
+ Throwable expectedException,
+ long expectedTimestamp,
+ Map<String, String> expectedFailureLabels) {
+ return matchesInternal(
+ exceptionHistoryEntry,
+ expectedException,
+ expectedTimestamp,
+ null,
+ expectedFailureLabels,
+ null);
+ }
+
+ public static boolean matchesFailure(
+ ExceptionHistoryEntry exceptionHistoryEntry,
+ Throwable expectedException,
+ long expectedTimestamp,
+ String expectedTaskName,
+ TaskManagerLocation expectedTaskManagerLocation) {
+ return matchesInternal(
+ exceptionHistoryEntry,
+ expectedException,
+ expectedTimestamp,
+ expectedTaskName,
+ expectedTaskManagerLocation);
+ }
+
+ private static boolean matchesInternal(
ExceptionHistoryEntry exceptionHistoryEntry,
Throwable expectedException,
long expectedTimestamp,
@@ -46,6 +76,7 @@ public class ExceptionHistoryEntryTestingUtils {
expectedException,
expectedTimestamp,
expectedTaskName,
+ Collections.emptyMap(),
expectedTaskManagerLocation);
}
@@ -54,6 +85,7 @@ public class ExceptionHistoryEntryTestingUtils {
Throwable expectedException,
long expectedTimestamp,
String expectedTaskName,
+ Map<String, String> expectedFailureLabels,
TaskManagerLocation expectedTaskManagerLocation) {
boolean match =
exceptionHistoryEntry
@@ -61,6 +93,9 @@ public class ExceptionHistoryEntryTestingUtils {
.deserializeError(ClassLoader.getSystemClassLoader())
.equals(expectedException)
&& exceptionHistoryEntry.getTimestamp() ==
expectedTimestamp
+ && Objects.equals(
+ exceptionHistoryEntry.getFailureLabelsFuture(),
+ expectedFailureLabels)
&& !Objects.equals(
exceptionHistoryEntry.getFailingTaskName(),
expectedTaskName);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
index 55dfaa392c0..43733e048c8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
@@ -29,6 +29,7 @@ import
org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
@@ -47,6 +48,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -86,6 +89,7 @@ class FailureHandlingResultSnapshotTest {
rootCauseExecutionVertex.getCurrentExecutionAttempt(),
new RuntimeException("Expected exception: root cause"),
System.currentTimeMillis(),
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
StreamSupport.stream(
executionGraph.getAllExecutionVertices().spliterator(),
false)
@@ -102,7 +106,7 @@ class FailureHandlingResultSnapshotTest {
}
@Test // see FLINK-22060/FLINK-21376
- void testMissingThrowableHandling() {
+ void testMissingThrowableHandling() throws ExecutionException,
InterruptedException {
final ExecutionVertex rootCauseExecutionVertex =
extractExecutionVertex(0);
final long rootCauseTimestamp =
triggerFailure(rootCauseExecutionVertex, null);
@@ -112,6 +116,8 @@ class FailureHandlingResultSnapshotTest {
rootCauseExecutionVertex.getCurrentExecutionAttempt(),
null,
rootCauseTimestamp,
+ CompletableFuture.completedFuture(
+ Collections.singletonMap("key2", "value2")),
StreamSupport.stream(
executionGraph.getAllExecutionVertices().spliterator(),
false)
@@ -120,6 +126,10 @@ class FailureHandlingResultSnapshotTest {
0L,
false);
+ // FailedExecution with failure labels
+ assertThat(failureHandlingResult.getFailureLabels().get())
+ .isEqualTo(Collections.singletonMap("key2", "value2"));
+
final FailureHandlingResultSnapshot testInstance =
FailureHandlingResultSnapshot.create(
failureHandlingResult, this::getCurrentExecutions);
@@ -153,6 +163,7 @@ class FailureHandlingResultSnapshotTest {
rootCauseExecutionVertex.getCurrentExecutionAttempt(),
rootCause,
rootCauseTimestamp,
+ FailureEnricherUtils.EMPTY_FAILURE_LABELS,
StreamSupport.stream(
executionGraph.getAllExecutionVertices().spliterator(),
false)
@@ -184,12 +195,14 @@ class FailureHandlingResultSnapshotTest {
rootCauseExecution,
new RuntimeException("Expected
exception"),
System.currentTimeMillis(),
+
FailureEnricherUtils.EMPTY_FAILURE_LABELS,
Collections.singleton(rootCauseExecution)))
.isInstanceOf(IllegalArgumentException.class);
}
@Test
- void testGlobalFailureHandlingResultSnapshotCreation() {
+ void testGlobalFailureHandlingResultSnapshotCreation()
+ throws ExecutionException, InterruptedException {
final Throwable rootCause = new FlinkException("Expected exception:
root cause");
final long timestamp = System.currentTimeMillis();
@@ -206,6 +219,8 @@ class FailureHandlingResultSnapshotTest {
null,
rootCause,
timestamp,
+ CompletableFuture.completedFuture(
+ Collections.singletonMap("key2", "value2")),
StreamSupport.stream(
executionGraph.getAllExecutionVertices().spliterator(),
false)
@@ -214,6 +229,10 @@ class FailureHandlingResultSnapshotTest {
0L,
true);
+ // FailedExecution with failure labels
+ assertThat(failureHandlingResult.getFailureLabels().get())
+ .isEqualTo(Collections.singletonMap("key2", "value2"));
+
final FailureHandlingResultSnapshot testInstance =
FailureHandlingResultSnapshot.create(
failureHandlingResult, this::getCurrentExecutions);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
index 21954fdee09..0c6ed9f9578 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
@@ -43,6 +43,9 @@ import org.junit.ClassRule;
import org.junit.Test;
import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -72,10 +75,13 @@ public class RootExceptionHistoryEntryTest extends
TestLogger {
}
@Test
- public void testFromFailureHandlingResultSnapshot() {
+ public void testFromFailureHandlingResultSnapshot()
+ throws ExecutionException, InterruptedException {
final Throwable rootException = new RuntimeException("Expected root
failure");
final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0);
final long rootTimestamp = triggerFailure(rootExecutionVertex,
rootException);
+ final CompletableFuture<Map<String, String>> rootFailureLabels =
+
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
final Throwable concurrentException = new
IllegalStateException("Expected other failure");
final ExecutionVertex concurrentlyFailedExecutionVertex =
extractExecutionVertex(1);
@@ -87,6 +93,7 @@ public class RootExceptionHistoryEntryTest extends TestLogger
{
rootExecutionVertex.getCurrentExecutionAttempt(),
rootException,
rootTimestamp,
+ rootFailureLabels,
Collections.singleton(
concurrentlyFailedExecutionVertex.getCurrentExecutionAttempt()));
final RootExceptionHistoryEntry actualEntry =
@@ -97,6 +104,7 @@ public class RootExceptionHistoryEntryTest extends
TestLogger {
ExceptionHistoryEntryMatcher.matchesFailure(
rootException,
rootTimestamp,
+ rootFailureLabels.get(),
rootExecutionVertex.getTaskNameWithSubtaskIndex(),
rootExecutionVertex.getCurrentAssignedResourceLocation()));
assertThat(
@@ -111,7 +119,7 @@ public class RootExceptionHistoryEntryTest extends
TestLogger {
}
@Test
- public void testFromGlobalFailure() {
+ public void testFromGlobalFailure() throws ExecutionException,
InterruptedException {
final Throwable concurrentException0 =
new RuntimeException("Expected concurrent failure #0");
final ExecutionVertex concurrentlyFailedExecutionVertex0 =
extractExecutionVertex(0);
@@ -126,10 +134,13 @@ public class RootExceptionHistoryEntryTest extends
TestLogger {
final Throwable rootCause = new Exception("Expected root failure");
final long rootTimestamp = System.currentTimeMillis();
+ final CompletableFuture<Map<String, String>> rootFailureLabels =
+
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
final RootExceptionHistoryEntry actualEntry =
RootExceptionHistoryEntry.fromGlobalFailure(
rootCause,
rootTimestamp,
+ rootFailureLabels,
StreamSupport.stream(
executionGraph.getAllExecutionVertices().spliterator(),
false)
@@ -138,7 +149,8 @@ public class RootExceptionHistoryEntryTest extends
TestLogger {
assertThat(
actualEntry,
- ExceptionHistoryEntryMatcher.matchesGlobalFailure(rootCause,
rootTimestamp));
+ ExceptionHistoryEntryMatcher.matchesGlobalFailure(
+ rootCause, rootTimestamp, rootFailureLabels.get()));
assertThat(
actualEntry.getConcurrentExceptions(),
IsIterableContainingInAnyOrder.containsInAnyOrder(