This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 7cac1ab  [FLINK-20992][checkpointing] Don't schedule checkpoint 
triggering during shutdown
7cac1ab is described below

commit 7cac1ab5646bb64c9e3bb00d0abdefc1c30ad4bd
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Mon Jan 18 12:43:24 2021 +0100

    [FLINK-20992][checkpointing] Don't schedule checkpoint triggering during 
shutdown
    
    This closes #14683.
---
 .../flink/runtime/checkpoint/CheckpointCoordinator.java |  9 ++++++++-
 .../runtime/checkpoint/CheckpointCoordinatorTest.java   | 17 ++++++++++++++++-
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8cbdedc..3c3b008 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1287,7 +1287,14 @@ public class CheckpointCoordinator {
     }
 
     void scheduleTriggerRequest() {
-        timer.execute(this::executeQueuedRequest);
+        synchronized (lock) {
+            if (isShutdown()) {
+                LOG.debug(
+                        "Skip scheduling trigger request because the 
CheckpointCoordinator is shut down");
+            } else {
+                timer.execute(this::executeQueuedRequest);
+            }
+        }
     }
 
     private void sendAcknowledgeMessages(long checkpointId, long timestamp) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 40cdc38..7f978e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -133,6 +134,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
+    public void testScheduleTriggerRequestDuringShutdown() throws Exception {
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        CheckpointCoordinator coordinator =
+                getCheckpointCoordinator(new 
ScheduledExecutorServiceAdapter(executor));
+        coordinator.shutdown();
+        executor.shutdownNow();
+        coordinator.scheduleTriggerRequest(); // shouldn't fail
+    }
+
+    @Test
     public void testMinCheckpointPause() throws Exception {
         // will use a different thread to allow checkpoint triggering before 
exiting from
         // receiveAcknowledgeMessage
@@ -3139,6 +3150,10 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
     }
 
     private CheckpointCoordinator getCheckpointCoordinator() {
+        return getCheckpointCoordinator(manuallyTriggeredScheduledExecutor);
+    }
+
+    private CheckpointCoordinator getCheckpointCoordinator(ScheduledExecutor 
timer) {
         final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
         final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
         ExecutionVertex triggerVertex1 = 
mockExecutionVertex(triggerAttemptID1);
@@ -3163,7 +3178,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                 .setTasksToTrigger(new ExecutionVertex[] {triggerVertex1, 
triggerVertex2})
                 .setTasksToWaitFor(new ExecutionVertex[] {ackVertex1, 
ackVertex2})
                 .setTasksToCommitTo(new ExecutionVertex[] {})
-                .setTimer(manuallyTriggeredScheduledExecutor)
+                .setTimer(timer)
                 .build();
     }
 

Reply via email to