This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push: new 7cac1ab [FLINK-20992][checkpointing] Don't schedule checkpoint triggering during shutdown 7cac1ab is described below commit 7cac1ab5646bb64c9e3bb00d0abdefc1c30ad4bd Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Mon Jan 18 12:43:24 2021 +0100 [FLINK-20992][checkpointing] Don't schedule checkpoint triggering during shutdown This closes #14683. --- .../flink/runtime/checkpoint/CheckpointCoordinator.java | 9 ++++++++- .../runtime/checkpoint/CheckpointCoordinatorTest.java | 17 ++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 8cbdedc..3c3b008 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1287,7 +1287,14 @@ public class CheckpointCoordinator { } void scheduleTriggerRequest() { - timer.execute(this::executeQueuedRequest); + synchronized (lock) { + if (isShutdown()) { + LOG.debug( + "Skip scheduling trigger request because the CheckpointCoordinator is shut down"); + } else { + timer.execute(this::executeQueuedRequest); + } + } } private void sendAcknowledgeMessages(long checkpointId, long timestamp) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 40cdc38..7f978e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -26,6 +26,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; @@ -133,6 +134,16 @@ public class CheckpointCoordinatorTest extends TestLogger { } @Test + public void testScheduleTriggerRequestDuringShutdown() throws Exception { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + CheckpointCoordinator coordinator = + getCheckpointCoordinator(new ScheduledExecutorServiceAdapter(executor)); + coordinator.shutdown(); + executor.shutdownNow(); + coordinator.scheduleTriggerRequest(); // shouldn't fail + } + + @Test public void testMinCheckpointPause() throws Exception { // will use a different thread to allow checkpoint triggering before exiting from // receiveAcknowledgeMessage @@ -3139,6 +3150,10 @@ public class CheckpointCoordinatorTest extends TestLogger { } private CheckpointCoordinator getCheckpointCoordinator() { + return getCheckpointCoordinator(manuallyTriggeredScheduledExecutor); + } + + private CheckpointCoordinator getCheckpointCoordinator(ScheduledExecutor timer) { final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID(); final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID(); ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1); @@ -3163,7 +3178,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .setTasksToTrigger(new ExecutionVertex[] {triggerVertex1, triggerVertex2}) .setTasksToWaitFor(new ExecutionVertex[] {ackVertex1, ackVertex2}) .setTasksToCommitTo(new ExecutionVertex[] {}) - .setTimer(manuallyTriggeredScheduledExecutor) + .setTimer(timer) .build(); }