This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fa731288518c8ebf66f40b4e0e9b1929546b6257 Author: Yun Tang <[email protected]> AuthorDate: Mon May 11 13:45:45 2020 +0800 [FLINK-8871][checkpoint] Support to cancel checkpoing via notification on task side --- .../connectors/fs/bucketing/BucketingSink.java | 4 + .../connectors/gcp/pubsub/PubSubSource.java | 4 + .../gcp/pubsub/common/AcknowledgeOnCheckpoint.java | 4 + .../connectors/kafka/FlinkKafkaConsumerBase.java | 4 + .../connectors/kafka/KafkaConsumerTestBase.java | 4 + .../connectors/kafka/KafkaProducerTestBase.java | 4 + .../kafka/testutils/FailingIdentityMapper.java | 4 + .../connectors/kafka/testutils/IntegerSource.java | 4 + .../flink/streaming/tests/FailureMapper.java | 4 + .../HeavyDeploymentStressTestProgram.java | 4 + .../StickyAllocationAndLocalRecoveryTestJob.java | 4 + .../api/runtime/SavepointTaskStateManager.java | 5 + .../flink/state/api/output/SnapshotUtilsTest.java | 4 + .../itcases/AbstractQueryableStateTestBase.java | 4 + .../runtime/jobgraph/tasks/AbstractInvokable.java | 12 + .../flink/runtime/state/CheckpointListener.java | 8 + .../runtime/state/NoOpTaskLocalStateStoreImpl.java | 4 + .../flink/runtime/state/TaskLocalStateStore.java | 6 + .../runtime/state/TaskLocalStateStoreImpl.java | 9 + .../flink/runtime/state/TaskStateManagerImpl.java | 10 +- .../runtime/state/heap/HeapKeyedStateBackend.java | 5 + .../org/apache/flink/runtime/taskmanager/Task.java | 28 ++ .../runtime/state/TaskLocalStateStoreImplTest.java | 23 +- .../runtime/state/TestTaskLocalStateStore.java | 19 ++ .../flink/runtime/state/TestTaskStateManager.java | 11 + .../state/ttl/mock/MockKeyedStateBackend.java | 5 + .../streaming/state/RocksDBKeyedStateBackend.java | 7 + .../state/snapshot/RocksFullSnapshotStrategy.java | 5 + .../snapshot/RocksIncrementalSnapshotStrategy.java | 7 + .../functions/sink/TwoPhaseCommitSinkFunction.java | 4 + .../sink/filesystem/StreamingFileSink.java | 4 + .../source/MessageAcknowledgingSourceBase.java | 4 + .../api/operators/AbstractStreamOperator.java | 5 + .../api/operators/AbstractStreamOperatorV2.java | 5 + .../api/operators/StreamOperatorStateHandler.java | 6 + .../api/operators/collect/CollectSinkFunction.java | 4 + .../runtime/tasks/AsyncCheckpointRunnable.java | 22 +- .../flink/streaming/runtime/tasks/StreamTask.java | 7 + .../tasks/SubtaskCheckpointCoordinator.java | 12 + .../tasks/SubtaskCheckpointCoordinatorImpl.java | 198 +++++++++++++- .../AbstractUdfStreamOperatorLifecycleTest.java | 1 + .../runtime/tasks/LocalStateForwardingTest.java | 3 +- .../MockSubtaskCheckpointCoordinatorBuilder.java | 14 +- .../tasks/SubtaskCheckpointCoordinatorTest.java | 297 ++++++++++++++++++++- .../runtime/tasks/SynchronousCheckpointITCase.java | 6 + .../runtime/utils/FailingCollectionSource.java | 4 + .../runtime/stream/FsStreamingSinkITCaseBase.scala | 3 + .../flink/streaming/util/FiniteTestSource.java | 4 + .../jobmaster/JobMasterStopWithSavepointIT.java | 5 + .../jobmaster/JobMasterTriggerSavepointITCase.java | 5 + .../checkpointing/CoStreamCheckpointingITCase.java | 4 + .../ContinuousFileProcessingCheckpointITCase.java | 4 + .../KeyedStateCheckpointingITCase.java | 4 + .../checkpointing/StateCheckpointedITCase.java | 4 + .../StreamCheckpointNotifierITCase.java | 20 ++ .../checkpointing/UnalignedCheckpointITCase.java | 8 + .../ZooKeeperHighAvailabilityITCase.java | 4 + .../utils/AccumulatingIntegerSink.java | 4 + .../utils/CancellingIntegerSource.java | 4 + .../test/checkpointing/utils/FailingSource.java | 4 + .../jar/CheckpointedStreamingProgram.java | 4 + .../jar/CheckpointingCustomKvStateProgram.java | 4 + .../ReinterpretDataStreamAsKeyedStreamITCase.java | 4 + 63 files changed, 877 insertions(+), 24 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index ec14cce..78cefaf 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -733,6 +733,10 @@ public class BucketingSink<T> } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized."); diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java index 1472bb2..8007fc1 100644 --- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java +++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java @@ -194,6 +194,10 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT> } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public List<AcknowledgeIdsForCheckpoint<String>> snapshotState(long checkpointId, long timestamp) throws Exception { return acknowledgeOnCheckpoint.snapshotState(checkpointId, timestamp); } diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java index f538b69..6b194b6 100644 --- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java +++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java @@ -83,6 +83,10 @@ public class AcknowledgeOnCheckpoint<ACKID extends Serializable> implements Chec } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public List<AcknowledgeIdsForCheckpoint<ACKID>> snapshotState(long checkpointId, long timestamp) throws Exception { acknowledgeIdsPerCheckpoint.add(new AcknowledgeIdsForCheckpoint<>(checkpointId, acknowledgeIdsForPendingCheckpoint)); acknowledgeIdsForPendingCheckpoint = new ArrayList<>(); diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 84057b0..733011f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -1035,6 +1035,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + // ------------------------------------------------------------------------ // Kafka Consumer specific methods // ------------------------------------------------------------------------ diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 95688b2..d99f8d9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -2250,6 +2250,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { return Collections.singletonList(this.numElementsTotal); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index c0c9844..2b586fe 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -531,6 +531,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink { } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { if (!triggeredShutdown) { lastSnapshotedElementBeforeShutdown = numElementsTotal; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java index bd412c9..9919f1e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java @@ -95,6 +95,10 @@ public class FailingIdentityMapper<T> extends RichMapFunction<T, T> implements } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { return Collections.singletonList(numElementsTotal); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java index 25a3cea..f471df4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java @@ -127,4 +127,8 @@ public class IntegerSource blocker.notifyAll(); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java index a3a1c25..458f9b2 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java @@ -71,6 +71,10 @@ public class FailureMapper<T> extends RichMapFunction<T, T> implements Checkpoin } } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + private boolean isReachedFailureThreshold() { return numProcessedRecords >= numProcessedRecordsFailureThreshold && numCompleteCheckpoints >= numCompleteCheckpointsFailureThreshold diff --git a/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java b/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java index d65583f..68b6d24 100644 --- a/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java +++ b/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java @@ -146,5 +146,9 @@ public class HeavyDeploymentStressTestProgram { public void notifyCheckpointComplete(long checkpointId) { readyToFail = true; } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } } diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java index b03791e..990baa9 100644 --- a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java +++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java @@ -378,6 +378,10 @@ public class StickyAllocationAndLocalRecoveryTestJob { failTask = currentSchedulingAndFailureInfo.failingTask; } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + private boolean shouldTaskFailForThisAttempt() { RuntimeContext runtimeContext = getRuntimeContext(); int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java index 9563e40..4fb15a2 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java @@ -78,6 +78,11 @@ final class SavepointTaskStateManager implements TaskStateManager { } @Override + public void notifyCheckpointAborted(long checkpointId) { + throw new UnsupportedOperationException(MSG); + } + + @Override public void close() { } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java index 5668284..36b4d63 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java @@ -131,6 +131,10 @@ public class SnapshotUtilsTest { } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void setCurrentKey(Object key) { ACTUAL_ORDER_TRACKING.add("setCurrentKey"); } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index 83444e8..d9e3c91 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -1124,6 +1124,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { LATEST_CHECKPOINT_ID.set(checkpointId); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index 72586ab..af57f97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -278,6 +278,18 @@ public abstract class AbstractInvokable { throw new UnsupportedOperationException(String.format("notifyCheckpointCompleteAsync not supported by %s", this.getClass().getName())); } + /** + * Invoked when a checkpoint has been aborted, i.e., when the checkpoint coordinator has received a decline message + * from one task and try to abort the targeted checkpoint by notification. + * + * @param checkpointId The ID of the checkpoint that is aborted. + * + * @return future that completes when the notification has been processed by the task. + */ + public Future<Void> notifyCheckpointAbortAsync(long checkpointId) { + throw new UnsupportedOperationException(String.format("notifyCheckpointAbortAsync not supported by %s", this.getClass().getName())); + } + public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException { throw new UnsupportedOperationException("dispatchOperatorEvent not supported by " + getClass().getName()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java index 0c99316..13c8e39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java @@ -38,4 +38,12 @@ public interface CheckpointListener { * @throws Exception */ void notifyCheckpointComplete(long checkpointId) throws Exception; + + /** + * This method is called as a notification once a distributed checkpoint has been aborted. + * + * @param checkpointId The ID of the checkpoint that has been aborted. + * @throws Exception + */ + void notifyCheckpointAborted(long checkpointId) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java index 11841a1..aece4aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java @@ -66,6 +66,10 @@ public final class NoOpTaskLocalStateStoreImpl implements OwnedTaskLocalStateSto } @Override + public void abortCheckpoint(long abortedCheckpointId) { + } + + @Override public void pruneMatchingCheckpoints(LongPredicate matcher) { } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java index b0d8a82..78f5068 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java @@ -68,6 +68,12 @@ public interface TaskLocalStateStore { void confirmCheckpoint(long confirmedCheckpointId); /** + * Notifies that the checkpoint with the given id was confirmed as aborted. This prunes the checkpoint history + * and removes states with a checkpoint id that is equal to the newly aborted checkpoint id. + */ + void abortCheckpoint(long abortedCheckpointId); + + /** * Remove all checkpoints from the store that match the given predicate. * @param matcher the predicate that selects the checkpoints for pruning. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index a57a7ef..52d7811 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -225,6 +225,15 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { } @Override + public void abortCheckpoint(long abortedCheckpointId) { + + LOG.debug("Received abort information for checkpoint {} in subtask ({} - {} - {}). Starting to prune history.", + abortedCheckpointId, jobID, jobVertexID, subtaskIndex); + + pruneCheckpoints(snapshotCheckpointId -> snapshotCheckpointId == abortedCheckpointId, false); + } + + @Override public void pruneMatchingCheckpoints(@Nonnull LongPredicate matcher) { pruneCheckpoints( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java index a42e808..f2ecde0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java @@ -180,13 +180,21 @@ public class TaskStateManagerImpl implements TaskStateManager { } /** - * Tracking when local state can be disposed. + * Tracking when local state can be confirmed and disposed. */ @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { localStateStore.confirmCheckpoint(checkpointId); } + /** + * Tracking when some local state can be disposed. + */ + @Override + public void notifyCheckpointAborted(long checkpointId) { + localStateStore.abortCheckpoint(checkpointId); + } + @Override public void close() throws Exception { channelStateReader.close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 7e184ab..68828ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -312,6 +312,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override + public void notifyCheckpointAborted(long checkpointId) { + // nothing to do + } + + @Override public <N, S extends State, T> void applyToAllKeys( final N namespace, final TypeSerializer<N> namespaceSerializer, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 2e3c9b6..bd4671f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -1218,6 +1218,34 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr } } + @Override + public void notifyCheckpointAborted(final long checkpointID) { + final AbstractInvokable invokable = this.invokable; + + if (executionState == ExecutionState.RUNNING && invokable != null) { + try { + invokable.notifyCheckpointAbortAsync(checkpointID); + } + catch (RejectedExecutionException ex) { + // This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it. + LOG.debug( + "Notify checkpoint abort {} for {} ({}) was rejected by the mailbox", + checkpointID, taskNameWithSubtask, executionId); + } + catch (Throwable t) { + if (getExecutionState() == ExecutionState.RUNNING) { + // fail task if checkpoint aborted notification failed. + failExternally(new RuntimeException( + "Error while aborting checkpoint", + t)); + } + } + } + else { + LOG.info("Ignoring checkpoint aborted notification for non-running task {}.", taskNameWithSubtask); + } + } + /** * Dispatches an operator event to the invokable task. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java index 7531783..784015b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java @@ -154,6 +154,21 @@ public class TaskLocalStateStoreImplTest { } /** + * Tests pruning of target previous checkpoints if that checkpoint is aborted. + */ + @Test + public void abortCheckpoint() throws Exception { + + final int chkCount = 4; + final int aborted = chkCount - 2; + List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + taskLocalStateStore.abortCheckpoint(aborted); + checkPrunedAndDiscarded(taskStateSnapshots, aborted, aborted + 1); + checkStoredAsExpected(taskStateSnapshots, 0, aborted); + checkStoredAsExpected(taskStateSnapshots, aborted + 1, chkCount); + } + + /** * Tests that disposal of a {@link TaskLocalStateStoreImpl} works and discards all local states. */ @Test @@ -167,16 +182,16 @@ public class TaskLocalStateStoreImplTest { checkPrunedAndDiscarded(taskStateSnapshots, 0, chkCount); } - private void checkStoredAsExpected(List<TaskStateSnapshot> history, int off, int len) throws Exception { - for (int i = off; i < len; ++i) { + private void checkStoredAsExpected(List<TaskStateSnapshot> history, int start, int end) throws Exception { + for (int i = start; i < end; ++i) { TaskStateSnapshot expected = history.get(i); Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i)); Mockito.verify(expected, Mockito.never()).discardState(); } } - private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, int off, int len) throws Exception { - for (int i = off; i < len; ++i) { + private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, int start, int end) throws Exception { + for (int i = start; i < end; ++i) { Assert.assertNull(taskLocalStateStore.retrieveLocalState(i)); Mockito.verify(history.get(i)).discardState(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java index 2ade3e6..e92a34a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java @@ -105,6 +105,25 @@ public class TestTaskLocalStateStore implements TaskLocalStateStore { } @Override + public void abortCheckpoint(long abortedCheckpointId) { + Preconditions.checkState(!disposed); + Iterator<Map.Entry<Long, TaskStateSnapshot>> iterator = taskStateSnapshotsByCheckpointID.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, TaskStateSnapshot> entry = iterator.next(); + if (entry.getKey() == abortedCheckpointId) { + iterator.remove(); + try { + entry.getValue().discardState(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else if (entry.getKey() > abortedCheckpointId){ + break; + } + } + } + + @Override public void pruneMatchingCheckpoints(LongPredicate matcher) { taskStateSnapshotsByCheckpointID.keySet().removeIf(matcher::test); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java index 1ce41b3..ae6c022 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java @@ -47,6 +47,7 @@ public class TestTaskStateManager implements TaskStateManager { private long reportedCheckpointId; private long notifiedCompletedCheckpointId; + private long notifiedAbortedCheckpointId; private JobID jobId; private ExecutionAttemptID executionAttemptID; @@ -88,6 +89,7 @@ public class TestTaskStateManager implements TaskStateManager { this.taskManagerTaskStateSnapshotsByCheckpointId = new HashMap<>(); this.reportedCheckpointId = -1L; this.notifiedCompletedCheckpointId = -1L; + this.notifiedAbortedCheckpointId = -1L; } @Override @@ -175,6 +177,11 @@ public class TestTaskStateManager implements TaskStateManager { this.notifiedCompletedCheckpointId = checkpointId; } + @Override + public void notifyCheckpointAborted(long checkpointId) { + this.notifiedAbortedCheckpointId = checkpointId; + } + public JobID getJobId() { return jobId; } @@ -227,6 +234,10 @@ public class TestTaskStateManager implements TaskStateManager { return notifiedCompletedCheckpointId; } + public long getNotifiedAbortedCheckpointId() { + return notifiedAbortedCheckpointId; + } + public void setReportedCheckpointId(long reportedCheckpointId) { this.reportedCheckpointId = reportedCheckpointId; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java index 5c82b80..5cb6866 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java @@ -167,6 +167,11 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override + public void notifyCheckpointAborted(long checkpointId) { + // noop + } + + @Override public <N> Stream<K> getKeys(String state, N namespace) { return stateValues.get(state).entrySet().stream() .filter(e -> e.getValue().containsKey(namespace)) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 2ddb79b..61d8688 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -464,6 +464,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + checkpointSnapshotStrategy.notifyCheckpointAborted(checkpointId); + + savepointSnapshotStrategy.notifyCheckpointAborted(checkpointId); + } + /** * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers. * diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java index 1085283..751eed8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java @@ -150,6 +150,11 @@ public class RocksFullSnapshotStrategy<K> extends RocksDBSnapshotStrategyBase<K> // nothing to do. } + @Override + public void notifyCheckpointAborted(long checkpointId) { + // nothing to do. + } + private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier( long checkpointId, CheckpointStreamFactory primaryStreamFactory, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 23f4574..77b1342 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -179,6 +179,13 @@ public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategy } } + @Override + public void notifyCheckpointAborted(long abortedCheckpointId) { + synchronized (materializedSstFiles) { + materializedSstFiles.keySet().remove(abortedCheckpointId); + } + } + @Nonnull private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throws IOException { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 6a42fb9..361bf3f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -305,6 +305,10 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // this is like the pre-commit of a 2-phase-commit transaction // we are ready to commit and remember the transaction diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index 54abac4..0962799 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -424,6 +424,10 @@ public class StreamingFileSink<IN> } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { Preconditions.checkState(helper != null, "sink has not been initialized"); this.helper.snapshotState(context.getCheckpointId()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index ffb2015..3a2a5ca 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -241,4 +241,8 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId> } } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a698254..a249d5e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -344,6 +344,11 @@ public abstract class AbstractStreamOperator<OUT> stateHandler.notifyCheckpointComplete(checkpointId); } + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + stateHandler.notifyCheckpointAborted(checkpointId); + } + // ------------------------------------------------------------------------ // Properties and Services // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 56533db..819b254 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -290,6 +290,11 @@ public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OU stateHandler.notifyCheckpointComplete(checkpointId); } + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + stateHandler.notifyCheckpointAborted(checkpointId); + } + // ------------------------------------------------------------------------ // Properties and Services // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index 99123b1..ed03907 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -222,6 +222,12 @@ public class StreamOperatorStateHandler { } } + public void notifyCheckpointAborted(long checkpointId) throws Exception { + if (keyedStateBackend != null) { + keyedStateBackend.notifyCheckpointAborted(checkpointId); + } + } + @SuppressWarnings("unchecked") public <K> KeyedStateBackend<K> getKeyedStateBackend() { return (KeyedStateBackend<K>) keyedStateBackend; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java index 64a3ffe..a687283a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java @@ -287,6 +287,10 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che } } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + public void setOperatorEventGateway(OperatorEventGateway eventGateway) { this.eventGateway = eventGateway; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java index b09c5a9..e89e962 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FileSystemSafetyNet; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; @@ -36,6 +35,7 @@ import java.io.Closeable; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -46,10 +46,11 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { public static final Logger LOG = LoggerFactory.getLogger(AsyncCheckpointRunnable.class); private final String taskName; - private final CloseableRegistry closeableRegistry; + private final Consumer<AsyncCheckpointRunnable> registerConsumer; + private final Consumer<AsyncCheckpointRunnable> unregisterConsumer; private final Environment taskEnvironment; - private enum AsyncCheckpointState { + enum AsyncCheckpointState { RUNNING, DISCARDED, COMPLETED @@ -70,7 +71,8 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { Future<?> channelWrittenFuture, long asyncStartNanos, String taskName, - CloseableRegistry closeableRegistry, + Consumer<AsyncCheckpointRunnable> register, + Consumer<AsyncCheckpointRunnable> unregister, Environment taskEnvironment, AsyncExceptionHandler asyncExceptionHandler) { @@ -80,7 +82,8 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { this.channelWrittenFuture = checkNotNull(channelWrittenFuture); this.asyncStartNanos = asyncStartNanos; this.taskName = checkNotNull(taskName); - this.closeableRegistry = checkNotNull(closeableRegistry); + this.registerConsumer = register; + this.unregisterConsumer = unregister; this.taskEnvironment = checkNotNull(taskEnvironment); this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler); } @@ -89,7 +92,7 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { public void run() { FileSystemSafetyNet.initializeSafetyNetForThread(); try { - closeableRegistry.registerCloseable(this); + registerConsumer.accept(this); TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size()); TaskStateSnapshot localTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size()); @@ -140,7 +143,7 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { } handleExecutionException(e); } finally { - closeableRegistry.unregisterCloseable(this); + unregisterConsumer.accept(this); FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } } @@ -229,6 +232,10 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { } } + long getCheckpointId() { + return checkpointMetaData.getCheckpointId(); + } + private void cleanup() throws Exception { LOG.debug( "Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", @@ -259,4 +266,5 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { taskName, checkpointMetaData.getCheckpointId()); } + } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 0bcbc30..1c346b1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -933,6 +933,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } + @Override + public Future<Void> notifyCheckpointAbortAsync(long checkpointId) { + return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit( + () -> subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning), + "checkpoint %d aborted", checkpointId); + } + private void tryShutdownTimerService() { if (!timerService.isTerminated()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java index d7352c6..2922735 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java @@ -65,4 +65,16 @@ interface SubtaskCheckpointCoordinator extends Closeable { long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception; + + /** + * Notified on the task side once a distributed checkpoint has been aborted. + * + * @param checkpointId The checkpoint id to notify as been completed. + * @param operatorChain The chain of operators executed by the task. + * @param isRunning Whether the task is running. + */ + void notifyCheckpointAborted( + long checkpointId, + OperatorChain<?, ?> operatorChain, + Supplier<Boolean> isRunning) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 8a55aa6..7508a16 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; @@ -41,19 +42,31 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.BiFunctionWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.GuardedBy; + import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; @@ -62,10 +75,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { private static final Logger LOG = LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class); + private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128; private final CachingCheckpointStorageWorkerView checkpointStorage; private final String taskName; - private final CloseableRegistry closeableRegistry; private final ExecutorService executorService; private final Environment env; private final AsyncExceptionHandler asyncExceptionHandler; @@ -73,6 +86,19 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { private final StreamTaskActionExecutor actionExecutor; private final boolean unalignedCheckpointEnabled; private final BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, IOException> prepareInputSnapshot; + /** The IDs of the checkpoint for which we are notified aborted. */ + private final Set<Long> abortedCheckpointIds; + private long lastCheckpointId; + + /** Lock that guards state of AsyncCheckpointRunnable registry. **/ + private final Object lock; + + @GuardedBy("lock") + private final Map<Long, AsyncCheckpointRunnable> checkpoints; + + /** Indicates if this registry is closed. */ + @GuardedBy("lock") + private boolean closed; SubtaskCheckpointCoordinatorImpl( CheckpointStorageWorkerView checkpointStorage, @@ -84,9 +110,34 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { AsyncExceptionHandler asyncExceptionHandler, boolean unalignedCheckpointEnabled, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, IOException> prepareInputSnapshot) throws IOException { + this(checkpointStorage, + taskName, + actionExecutor, + closeableRegistry, + executorService, + env, + asyncExceptionHandler, + unalignedCheckpointEnabled, + prepareInputSnapshot, + DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS); + } + + @VisibleForTesting + SubtaskCheckpointCoordinatorImpl( + CheckpointStorageWorkerView checkpointStorage, + String taskName, + StreamTaskActionExecutor actionExecutor, + CloseableRegistry closeableRegistry, + ExecutorService executorService, + Environment env, + AsyncExceptionHandler asyncExceptionHandler, + boolean unalignedCheckpointEnabled, + BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, IOException> prepareInputSnapshot, + int maxRecordAbortedCheckpoints) throws IOException { this.checkpointStorage = new CachingCheckpointStorageWorkerView(checkNotNull(checkpointStorage)); this.taskName = checkNotNull(taskName); - this.closeableRegistry = checkNotNull(closeableRegistry); + this.checkpoints = new HashMap<>(); + this.lock = new Object(); this.executorService = checkNotNull(executorService); this.env = checkNotNull(env); this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler); @@ -94,7 +145,10 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { this.channelStateWriter = unalignedCheckpointEnabled ? openChannelStateWriter() : ChannelStateWriter.NO_OP; this.unalignedCheckpointEnabled = unalignedCheckpointEnabled; this.prepareInputSnapshot = prepareInputSnapshot; - this.closeableRegistry.registerCloseable(this); + this.abortedCheckpointIds = createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints); + this.lastCheckpointId = -1L; + closeableRegistry.registerCloseable(this); + this.closed = false; } private ChannelStateWriter openChannelStateWriter() { @@ -144,6 +198,15 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream // checkpoint alignments + // Step (0): Record the last triggered checkpointId. + Preconditions.checkArgument(lastCheckpointId < metadata.getCheckpointId(), String.format( + "Unexpected current checkpoint-id: %s vs last checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId)); + lastCheckpointId = metadata.getCheckpointId(); + if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { + LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId()); + return; + } + // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. // The pre-barrier work should be nothing or minimal in the common case. operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId()); @@ -188,6 +251,106 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { env.getTaskStateManager().notifyCheckpointComplete(checkpointId); } + @Override + public void notifyCheckpointAborted(long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception { + + Exception previousException = null; + if (isRunning.get()) { + LOG.debug("Notification of aborted checkpoint for task {}", taskName); + + boolean canceled = cancelAsyncCheckpointRunnable(checkpointId); + + if (!canceled) { + if (checkpointId > lastCheckpointId) { + // only record checkpoints that have not triggered on task side. + abortedCheckpointIds.add(checkpointId); + } + } + + for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) { + try { + operatorWrapper.getStreamOperator().notifyCheckpointAborted(checkpointId); + } catch (Exception e) { + previousException = e; + } + } + + } else { + LOG.debug("Ignoring notification of aborted checkpoint for not-running task {}", taskName); + } + + env.getTaskStateManager().notifyCheckpointAborted(checkpointId); + ExceptionUtils.tryRethrowException(previousException); + } + + @Override + public void close() throws IOException { + List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null; + synchronized (lock) { + if (!closed) { + closed = true; + asyncCheckpointRunnables = new ArrayList<>(checkpoints.values()); + checkpoints.clear(); + } + } + IOUtils.closeAllQuietly(asyncCheckpointRunnables); + channelStateWriter.close(); + } + + @VisibleForTesting + int getAsyncCheckpointRunnableSize() { + synchronized (lock) { + return checkpoints.size(); + } + } + + @VisibleForTesting + int getAbortedCheckpointSize() { + return abortedCheckpointIds.size(); + } + + private boolean checkAndClearAbortedStatus(long checkpointId) { + return abortedCheckpointIds.remove(checkpointId); + } + + private void registerAsyncCheckpointRunnable(long checkpointId, AsyncCheckpointRunnable asyncCheckpointRunnable) throws IOException { + StringBuilder exceptionMessage = new StringBuilder("Cannot register Closeable, "); + synchronized (lock) { + if (!closed) { + if (!checkpoints.containsKey(checkpointId)) { + checkpoints.put(checkpointId, asyncCheckpointRunnable); + return; + } else { + exceptionMessage.append("async checkpoint ").append(checkpointId).append(" runnable has been register. "); + } + } else { + exceptionMessage.append("this subtaskCheckpointCoordinator is already closed. "); + } + } + + IOUtils.closeQuietly(asyncCheckpointRunnable); + throw new IOException(exceptionMessage.append("Closing argument.").toString()); + } + + private boolean unregisterAsyncCheckpointRunnable(long checkpointId) { + synchronized (lock) { + return checkpoints.remove(checkpointId) != null; + } + } + + /** + * Cancel the async checkpoint runnable with given checkpoint id. + * If given checkpoint id is not registered, return false, otherwise return true. + */ + private boolean cancelAsyncCheckpointRunnable(long checkpointId) { + AsyncCheckpointRunnable asyncCheckpointRunnable; + synchronized (lock) { + asyncCheckpointRunnable = checkpoints.remove(checkpointId); + } + IOUtils.closeQuietly(asyncCheckpointRunnable); + return asyncCheckpointRunnable != null; + } + private void cleanup( Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData metadata, @@ -251,11 +414,26 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { channelWrittenFuture, System.nanoTime(), taskName, - closeableRegistry, + registerConsumer(), + unregisterConsumer(), env, asyncExceptionHandler)); } + private Consumer<AsyncCheckpointRunnable> registerConsumer() { + return asyncCheckpointRunnable -> { + try { + registerAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId(), asyncCheckpointRunnable); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + } + + private Consumer<AsyncCheckpointRunnable> unregisterConsumer() { + return asyncCheckpointRunnable -> unregisterAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId()); + } + private boolean takeSnapshotSync( Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, @@ -342,9 +520,15 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { return snapshotInProgress; } - @Override - public void close() throws IOException { - channelStateWriter.close(); + private Set<Long> createAbortedCheckpointSetWithLimitSize(int maxRecordAbortedCheckpoints) { + return Collections.newSetFromMap(new LinkedHashMap<Long, Boolean>() { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Map.Entry<Long, Boolean> eldest) { + return size() > maxRecordAbortedCheckpoints; + } + }); } // Caches checkpoint output stream factories to prevent multiple output stream per checkpoint. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index 5fb8830..46dfbde9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -93,6 +93,7 @@ public class AbstractUdfStreamOperatorLifecycleTest { "getMetricGroup[], " + "getOperatorID[], " + "initializeState[interface org.apache.flink.streaming.api.operators.StreamTaskStateInitializer], " + + "notifyCheckpointAborted[long], " + "notifyCheckpointComplete[long], " + "open[], " + "prepareSnapshotPreBarrier[long], " + diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java index 8d4fae2..3e0703f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -118,7 +118,8 @@ public class LocalStateForwardingTest extends TestLogger { CompletableFuture.completedFuture(null), 0L, testStreamTask.getName(), - testStreamTask.getCancelables(), + asyncCheckpointRunnable -> {}, + asyncCheckpointRunnable -> {}, testStreamTask.getEnvironment(), testStreamTask); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java index d3898db..e0db33f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java @@ -47,6 +47,7 @@ public class MockSubtaskCheckpointCoordinatorBuilder { private ExecutorService executorService = Executors.newDirectExecutorService(); private BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, IOException> prepareInputSnapshot = (channelStateWriter, aLong) -> FutureUtils.completedVoidFuture(); private boolean unalignedCheckpointEnabled; + private int maxRecordAbortedCheckpoints = 10; public MockSubtaskCheckpointCoordinatorBuilder setEnvironment(Environment environment) { this.environment = environment; @@ -58,6 +59,16 @@ public class MockSubtaskCheckpointCoordinatorBuilder { return this; } + public MockSubtaskCheckpointCoordinatorBuilder setExecutor(ExecutorService executor) { + this.executorService = executor; + return this; + } + + public MockSubtaskCheckpointCoordinatorBuilder setMaxRecordAbortedCheckpoints(int maxRecordAbortedCheckpoints) { + this.maxRecordAbortedCheckpoints = maxRecordAbortedCheckpoints; + return this; + } + public MockSubtaskCheckpointCoordinatorBuilder setUnalignedCheckpointEnabled(boolean unalignedCheckpointEnabled) { this.unalignedCheckpointEnabled = unalignedCheckpointEnabled; return this; @@ -83,7 +94,8 @@ public class MockSubtaskCheckpointCoordinatorBuilder { environment, asyncExceptionHandler, unalignedCheckpointEnabled, - prepareInputSnapshot); + prepareInputSnapshot, + maxRecordAbortedCheckpoints); } private static class NonHandleAsyncException implements AsyncExceptionHandler { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index efe9185..94bb92f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -18,21 +18,45 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; import org.apache.flink.streaming.util.MockStreamTaskBuilder; +import org.apache.flink.util.ExceptionUtils; import org.junit.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -48,9 +72,7 @@ public class SubtaskCheckpointCoordinatorTest { .setEnvironment(mockEnvironment) .build(); - final OperatorChain<?, ?> operatorChain = new OperatorChain<>( - new MockStreamTaskBuilder(new DummyEnvironment()).build(), - new NonRecordWriter<>()); + final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment); long checkpointId = 42L; { @@ -82,4 +104,273 @@ public class SubtaskCheckpointCoordinatorTest { new OperatorChain<>(new NoOpStreamTask<>(new DummyEnvironment()), new NonRecordWriter<>()), () -> false); } + + @Test + public void testNotifyCheckpointAbortedManyTimes() throws Exception { + MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + int maxRecordAbortedCheckpoints = 256; + SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .setMaxRecordAbortedCheckpoints(maxRecordAbortedCheckpoints) + .build(); + + final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment); + + long notifyAbortedTimes = maxRecordAbortedCheckpoints + 42; + for (int i = 1; i < notifyAbortedTimes; i++) { + subtaskCheckpointCoordinator.notifyCheckpointAborted(i, operatorChain, () -> true); + assertEquals(Math.min(maxRecordAbortedCheckpoints, i), subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + } + } + + @Test + public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception { + TestTaskStateManager stateManager = new TestTaskStateManager(); + MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager(stateManager).build(); + SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .setUnalignedCheckpointEnabled(true) + .build(); + + CheckpointOperator checkpointOperator = new CheckpointOperator(new OperatorSnapshotFutures()); + + final OperatorChain<String, AbstractStreamOperator<String>> operatorChain = operatorChain(checkpointOperator); + + long checkpointId = 42L; + // notify checkpoint aborted before execution. + subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true); + assertEquals(1, subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + + subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation()); + subtaskCheckpointCoordinator.checkpointState( + new CheckpointMetaData(checkpointId, System.currentTimeMillis()), + CheckpointOptions.forCheckpointWithDefaultLocation(), + new CheckpointMetrics(), + operatorChain, + () -> true); + assertFalse(checkpointOperator.isCheckpointed()); + assertEquals(-1, stateManager.getReportedCheckpointId()); + assertEquals(0, subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + assertEquals(0, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize()); + } + + @Test + public void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception { + MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .setExecutor(Executors.newSingleThreadExecutor()) + .setUnalignedCheckpointEnabled(true) + .build(); + + final BlockingRunnableFuture rawKeyedStateHandleFuture = new BlockingRunnableFuture(); + OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( + DoneFuture.of(SnapshotResult.empty()), + rawKeyedStateHandleFuture, + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty()), + DoneFuture.of(SnapshotResult.empty())); + + final OperatorChain<String, AbstractStreamOperator<String>> operatorChain = operatorChain(new CheckpointOperator(operatorSnapshotResult)); + + long checkpointId = 42L; + subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation()); + subtaskCheckpointCoordinator.checkpointState( + new CheckpointMetaData(checkpointId, System.currentTimeMillis()), + CheckpointOptions.forCheckpointWithDefaultLocation(), + new CheckpointMetrics(), + operatorChain, + () -> true); + rawKeyedStateHandleFuture.awaitRun(); + assertEquals(1, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize()); + assertFalse(rawKeyedStateHandleFuture.isCancelled()); + + subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true); + assertTrue(rawKeyedStateHandleFuture.isCancelled()); + assertEquals(0, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize()); + } + + @Test + public void testNotifyCheckpointAbortedAfterAsyncPhase() throws Exception { + TestTaskStateManager stateManager = new TestTaskStateManager(); + MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager(stateManager).build(); + SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .build(); + + final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment); + + long checkpointId = 42L; + subtaskCheckpointCoordinator.checkpointState( + new CheckpointMetaData(checkpointId, System.currentTimeMillis()), + CheckpointOptions.forCheckpointWithDefaultLocation(), + new CheckpointMetrics(), + operatorChain, + () -> true); + subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true); + assertEquals(0, subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + assertEquals(checkpointId, stateManager.getNotifiedAbortedCheckpointId()); + } + + private OperatorChain<?, ?> getOperatorChain(MockEnvironment mockEnvironment) throws Exception { + return new OperatorChain<>( + new MockStreamTaskBuilder(mockEnvironment).build(), + new NonRecordWriter<>()); + } + + private <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(OneInputStreamOperator<T, T>... streamOperators) throws Exception { + return OperatorChainTest.setupOperatorChain(streamOperators); + } + + private static final class BlockingRunnableFuture implements RunnableFuture<SnapshotResult<KeyedStateHandle>> { + + private final CompletableFuture<SnapshotResult<KeyedStateHandle>> future = new CompletableFuture<>(); + + private final OneShotLatch signalRunLatch = new OneShotLatch(); + + private final CountDownLatch countDownLatch; + + private final SnapshotResult<KeyedStateHandle> value; + + private BlockingRunnableFuture() { + // count down twice to wait for notify checkpoint aborted to cancel. + this.countDownLatch = new CountDownLatch(2); + this.value = SnapshotResult.empty(); + } + + @Override + public void run() { + signalRunLatch.trigger(); + countDownLatch.countDown(); + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + ExceptionUtils.rethrow(e); + } + + future.complete(value); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + future.cancel(mayInterruptIfRunning); + return true; + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public SnapshotResult<KeyedStateHandle> get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public SnapshotResult<KeyedStateHandle> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException { + return future.get(); + } + + void awaitRun() throws InterruptedException { + signalRunLatch.await(); + } + } + + private static class CheckpointOperator implements OneInputStreamOperator<String, String> { + + private static final long serialVersionUID = 1L; + + private final OperatorSnapshotFutures operatorSnapshotFutures; + + private boolean checkpointed = false; + + CheckpointOperator(OperatorSnapshotFutures operatorSnapshotFutures) { + this.operatorSnapshotFutures = operatorSnapshotFutures; + } + + boolean isCheckpointed() { + return checkpointed; + } + + @Override + public void open() throws Exception { + } + + @Override + public void close() throws Exception { + } + + @Override + public void dispose() { + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) { + } + + @Override + public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception { + this.checkpointed = true; + return operatorSnapshotFutures; + } + + @Override + public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { + } + + @Override + public void setKeyContextElement1(StreamRecord<?> record) { + } + + @Override + public void setKeyContextElement2(StreamRecord<?> record) { + } + + @Override + public MetricGroup getMetricGroup() { + return null; + } + + @Override + public OperatorID getOperatorID() { + return new OperatorID(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override + public void setCurrentKey(Object key) { + } + + @Override + public Object getCurrentKey() { + return null; + } + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + } + + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) { + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java index ca5ee61..eff8c64 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java @@ -68,6 +68,7 @@ import org.junit.Test; import org.junit.rules.Timeout; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -175,6 +176,11 @@ public class SynchronousCheckpointITCase { } @Override + public Future<Void> notifyCheckpointAbortAsync(long checkpointId) { + return CompletableFuture.completedFuture(null); + } + + @Override protected void init() { } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java index 28d92b9..df3a568 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java @@ -245,6 +245,10 @@ public class FailingCollectionSource<T> lastCheckpointedEmittedNum = checkpointedEmittedNums.get(checkpointId); } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + public static void reset() { failedBefore = false; } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala index 56b7d21..02b70b4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala @@ -176,4 +176,7 @@ class FiniteTestSource(elements: Iterable[Row]) extends SourceFunction[Row] with override def notifyCheckpointComplete(checkpointId: Long): Unit = { numCheckpointsComplete += 1 } + + @throws[Exception] + override def notifyCheckpointAborted(checkpointId: Long): Unit = {} } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java index b3a9546..c85aaaa 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java @@ -91,4 +91,8 @@ public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListene public void notifyCheckpointComplete(long checkpointId) throws Exception { numCheckpointsComplete++; } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java index 12d186f..42c889c 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java @@ -352,6 +352,11 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { return super.notifyCheckpointCompleteAsync(checkpointId); } + + @Override + public Future<Void> notifyCheckpointAbortAsync(long checkpointId) { + return CompletableFuture.completedFuture(null); + } } /** diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java index 4193694..8605623 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java @@ -251,6 +251,11 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase { public Future<Void> notifyCheckpointCompleteAsync(final long checkpointId) { return CompletableFuture.completedFuture(null); } + + @Override + public Future<Void> notifyCheckpointAbortAsync(long checkpointId) { + return CompletableFuture.completedFuture(null); + } } private String cancelWithSavepoint() throws Exception { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java index b207de8..cf49d5a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java @@ -227,6 +227,10 @@ public class CoStreamCheckpointingITCase extends AbstractTestBase { } } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + private static String randomString(StringBuilder bld, Random rnd) { final int len = rnd.nextInt(10) + 5; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java index 612ed43..fa6a0d1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java @@ -247,6 +247,10 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran this.successfulCheckpoints++; } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + private int getFileIdx(String line) { String[] tkns = line.split(":"); return Integer.parseInt(tkns[0]); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java index e7f9921..b2895e6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java @@ -265,6 +265,10 @@ public class KeyedStateCheckpointingITCase extends TestLogger { this.notifyAll(); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } private static class OnceFailingPartitionedSum diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java index 0fcfb8f..a243843 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java @@ -364,6 +364,10 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { public void notifyCheckpointComplete(long checkpointId) { this.wasCheckpointed = true; } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } private static class ValidatingSink extends RichSinkFunction<PrefixCount> diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index 7b058a0..ece4f62 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -250,6 +250,10 @@ public class StreamCheckpointNotifierITCase extends AbstractTestBase { GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } /** @@ -281,6 +285,10 @@ public class StreamCheckpointNotifierITCase extends AbstractTestBase { GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } /** @@ -312,6 +320,10 @@ public class StreamCheckpointNotifierITCase extends AbstractTestBase { GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } /** @@ -349,6 +361,10 @@ public class StreamCheckpointNotifierITCase extends AbstractTestBase { GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } /** @@ -414,5 +430,9 @@ public class StreamCheckpointNotifierITCase extends AbstractTestBase { GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index b8f3f16..47e7007 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -248,6 +248,10 @@ public class UnalignedCheckpointITCase extends TestLogger { } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void run(SourceContext<Long> ctx) throws Exception { int increment = getRuntimeContext().getNumberOfParallelSubtasks(); info("First emitted input {}", state.nextNumber); @@ -446,6 +450,10 @@ public class UnalignedCheckpointITCase extends TestLogger { } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { checkFail(failDuringSnapshot, "snapshotState"); listState.clear(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java index 5758f76..81df28f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java @@ -422,6 +422,10 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger { checkpointCompletedIncludingData.compareAndSet(false, true); } } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } /** diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.java index a4b66d6..4fd967c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.java @@ -75,6 +75,10 @@ public class AccumulatingIntegerSink extends RichSinkFunction<Integer> implement pendingForAccumulator.remove(checkpointId).forEach(accumulator::add); } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + @SuppressWarnings("unchecked") public static List<Integer> getOutput(Map<String, Object> accumulators) { return (List<Integer>) accumulators.get(ACCUMULATOR_NAME); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.java index 1f49c2e..93ad11b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.java @@ -112,6 +112,10 @@ public class CancellingIntegerSource extends RichSourceFunction<Integer> impleme } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void cancel() { isCanceled = true; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java index ff49959..82ca91e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java @@ -135,6 +135,10 @@ public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { // We accept a checkpoint as basis if it should have a "decent amount" of state if (emitCallCount > failureAfterNumElements / 2) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java index 234c473..f1fae2c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java @@ -123,6 +123,10 @@ public class CheckpointedStreamingProgram { public void notifyCheckpointComplete(long checkpointId) throws Exception { atLeastOneSnapshotComplete = true; } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java index 0d5f2d7..ca178c7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java @@ -166,6 +166,10 @@ public class CheckpointingCustomKvStateProgram { atLeastOneSnapshotComplete = true; } + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + private static class ReduceSum implements ReduceFunction<Integer> { private static final long serialVersionUID = 1L; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java index db03220..29c0346 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java @@ -257,6 +257,10 @@ public class ReinterpretDataStreamAsKeyedStreamITCase { } @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { positionState.clear(); positionState.add(position);
