This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 88b309b7dca [FLINK-27570][runtime] Count checkpoint finalization
failures in CheckpointFailureManager
88b309b7dca is described below
commit 88b309b7dcad269ad084eab5e2944724daf6dee4
Author: 鲍健昕 <[email protected]>
AuthorDate: Wed Jul 20 10:35:40 2022 +0800
[FLINK-27570][runtime] Count checkpoint finalization failures in
CheckpointFailureManager
---
.../runtime/checkpoint/CheckpointCoordinator.java | 12 ++--
.../checkpoint/CheckpointFailureManager.java | 3 +-
.../runtime/checkpoint/DefaultCheckpointPlan.java | 5 +-
.../checkpoint/FinishedTaskStateProvider.java | 23 ++++++-
.../filesystem/FsCheckpointStorageAccess.java | 10 ++-
.../checkpoint/CheckpointFailureManagerTest.java | 5 +-
.../CheckpointFailureManagerITCase.java | 79 ++++++++++++++++++++--
7 files changed, 117 insertions(+), 20 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 1051dbc6bc0..0f5033ac8ed 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.SavepointFormatType;
+import
org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -1365,18 +1366,21 @@ public class CheckpointCoordinator {
} catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize
the pending
// checkpoint.
+ final CheckpointFailureReason failureReason =
+ e1 instanceof PartialFinishingNotSupportedByStateException
+ ?
CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING
+ :
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE;
+
if (!pendingCheckpoint.isDisposed()) {
abortPendingCheckpoint(
- pendingCheckpoint,
- new CheckpointException(
-
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
+ pendingCheckpoint, new
CheckpointException(failureReason, e1));
}
throw new CheckpointException(
"Could not finalize the pending checkpoint "
+ pendingCheckpoint.getCheckpointID()
+ '.',
- CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
+ failureReason,
e1);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 08cf49e41ee..8db1fe307a1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -238,8 +238,8 @@ public class CheckpointFailureManager {
case TASK_FAILURE:
case TASK_CHECKPOINT_FAILURE:
case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
+ // there are some edge cases shouldn't be counted as a
failure, e.g. shutdown
case TRIGGER_CHECKPOINT_FAILURE:
- case FINALIZE_CHECKPOINT_FAILURE:
// ignore
break;
@@ -247,6 +247,7 @@ public class CheckpointFailureManager {
case CHECKPOINT_ASYNC_EXCEPTION:
case CHECKPOINT_DECLINED:
case CHECKPOINT_EXPIRED:
+ case FINALIZE_CHECKPOINT_FAILURE:
// we should make sure one checkpoint only be counted once
if (checkpointId == UNKNOWN_CHECKPOINT_ID
|| countedCheckpointIds.add(checkpointId)) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
index eaa7a595e7d..9253799a17a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
@@ -25,7 +25,6 @@ import
org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.util.FlinkRuntimeException;
import java.util.Collection;
import java.util.HashMap;
@@ -152,7 +151,7 @@ public class DefaultCheckpointPlan implements
CheckpointPlan {
Map<OperatorID, OperatorState> operatorStates) {
for (ExecutionJobVertex vertex : partlyFinishedVertex.values()) {
if (hasUsedUnionListState(vertex, operatorStates)) {
- throw new FlinkRuntimeException(
+ throw new PartialFinishingNotSupportedByStateException(
String.format(
"The vertex %s (id = %s) has used"
+ " UnionListState, but part of its
tasks are FINISHED.",
@@ -183,7 +182,7 @@ public class DefaultCheckpointPlan implements
CheckpointPlan {
if (entry.getValue() != vertex.getParallelism()
&& hasUsedUnionListState(vertex, operatorStates)) {
- throw new FlinkRuntimeException(
+ throw new PartialFinishingNotSupportedByStateException(
String.format(
"The vertex %s (id = %s) has used"
+ " UnionListState, but part of its
tasks has called operators' finish method.",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java
index 167d3686042..4b17899b2ea 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.FlinkRuntimeException;
import java.util.Map;
@@ -33,5 +34,25 @@ public interface FinishedTaskStateProvider {
void reportTaskHasFinishedOperators(ExecutionVertex task);
/** Fulfills the state for the finished subtasks and operators to indicate
they are finished. */
- void fulfillFinishedTaskStatus(Map<OperatorID, OperatorState>
operatorStates);
+ void fulfillFinishedTaskStatus(Map<OperatorID, OperatorState>
operatorStates)
+ throws PartialFinishingNotSupportedByStateException;
+
+ /**
+ * Thrown when some subtasks of the operator have been finished but state
doesn't support that
+ * (e.g. Union).
+ */
+ class PartialFinishingNotSupportedByStateException extends
FlinkRuntimeException {
+
+ public PartialFinishingNotSupportedByStateException(String message) {
+ super(message);
+ }
+
+ public PartialFinishingNotSupportedByStateException(Throwable cause) {
+ super(cause);
+ }
+
+ public PartialFinishingNotSupportedByStateException(String message,
Throwable cause) {
+ super(message, cause);
+ }
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index af373242c64..11807c3de85 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -113,8 +113,14 @@ public class FsCheckpointStorageAccess extends
AbstractFsCheckpointStorageAccess
@Override
public void initializeBaseLocationsForCheckpoint() throws IOException {
- fileSystem.mkdirs(sharedStateDirectory);
- fileSystem.mkdirs(taskOwnedStateDirectory);
+ if (!fileSystem.mkdirs(sharedStateDirectory)) {
+ throw new IOException(
+ "Failed to create directory for shared state: " +
sharedStateDirectory);
+ }
+ if (!fileSystem.mkdirs(taskOwnedStateDirectory)) {
+ throw new IOException(
+ "Failed to create directory for task owned state: " +
taskOwnedStateDirectory);
+ }
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 8ed9b037ade..a2084c6616a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -119,8 +119,9 @@ public class CheckpointFailureManagerTest extends
TestLogger {
checkpointProperties, new CheckpointException(reason), -2);
}
- // IO_EXCEPTION, CHECKPOINT_DECLINED, CHECKPOINT_EXPIRED and
CHECKPOINT_ASYNC_EXCEPTION
- assertEquals(4, callback.getInvokeCounter());
+ // IO_EXCEPTION, CHECKPOINT_DECLINED, FINALIZE_CHECKPOINT_FAILURE,
CHECKPOINT_EXPIRED and
+ // CHECKPOINT_ASYNC_EXCEPTION
+ assertEquals(5, callback.getInvokeCounter());
}
@Test
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
index c23a4cdf420..27f6d78501f 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.checkpointing;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -28,7 +29,12 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -39,7 +45,10 @@ import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
+import
org.apache.flink.runtime.state.TestingCheckpointStorageAccessCoordinatorView;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import
org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
+import
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -60,13 +69,41 @@ import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE;
+import static org.junit.Assert.fail;
+
/** Tests to verify end-to-end logic of checkpoint failure manager. */
public class CheckpointFailureManagerITCase extends TestLogger {
+ /**
+ * Test that checkpoint finalization failure is counted by {@link
CheckpointFailureManager} and
+ * eventually fails the job. In this test, finalization is failed by
throwing an exception from
+ * {@link
org.apache.flink.runtime.state.CheckpointStorageLocation#createMetadataOutputStream}
+ * which should fail the job.
+ */
+ @Test
+ public void testFinalizationFailureCounted() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10);
+ env.getCheckpointConfig().setCheckpointStorage(new
FailingFinalizationCheckpointStorage());
+ env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink(new
DiscardingSink<>());
+ JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ try {
+ TestUtils.submitJobAndWaitForResult(
+ cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
+ fail("The job should fail");
+ } catch (JobExecutionException jobException) {
+ if (!isCheckpointFailure(jobException)) {
+ throw jobException;
+ }
+ }
+ }
+
@ClassRule
public static MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
@@ -85,12 +122,9 @@ public class CheckpointFailureManagerITCase extends
TestLogger {
TestUtils.submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
} catch (JobExecutionException jobException) {
- Optional<FlinkRuntimeException> throwable =
- ExceptionUtils.findThrowable(jobException,
FlinkRuntimeException.class);
- Assert.assertTrue(throwable.isPresent());
- Assert.assertEquals(
-
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
- throwable.get().getMessage());
+ if (!isCheckpointFailure(jobException)) {
+ throw jobException;
+ }
}
// assert that the job only failed once.
Assert.assertEquals(1,
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
@@ -202,4 +236,35 @@ public class CheckpointFailureManagerITCase extends
TestLogger {
return this;
}
}
+
+ private static class FailingFinalizationCheckpointStorage implements
CheckpointStorage {
+ private static final long serialVersionUID = 8134582566514272546L;
+
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String
externalPointer) {
+ return new TestCompletedCheckpointStorageLocation();
+ }
+
+ @Override
+ public CheckpointStorageAccess createCheckpointStorage(JobID jobId) {
+ return new TestingCheckpointStorageAccessCoordinatorView() {
+ @Override
+ public CheckpointStorageLocation
initializeLocationForCheckpoint(
+ long checkpointId) {
+ return new
NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE) {
+ @Override
+ public CheckpointMetadataOutputStream
createMetadataOutputStream() {
+ throw new RuntimeException("finalization failure");
+ }
+ };
+ }
+ };
+ }
+ }
+
+ private boolean isCheckpointFailure(JobExecutionException jobException) {
+ return ExceptionUtils.findThrowable(jobException,
FlinkRuntimeException.class)
+ .filter(ex ->
ex.getMessage().equals(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE))
+ .isPresent();
+ }
}