This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e68d679e8aca9b8b17e1b667188545c915a137d0
Author: fanrui <[email protected]>
AuthorDate: Tue May 24 18:06:58 2022 +0800

    [FLINK-27251][checkpoint] Refactor the close() and cancel() of 
SubtaskCheckpointCoordinator
---
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java |  3 ++-
 .../runtime/tasks/SubtaskCheckpointCoordinator.java          |  3 +++
 .../runtime/tasks/SubtaskCheckpointCoordinatorImpl.java      | 12 +++++-------
 .../tasks/MockSubtaskCheckpointCoordinatorBuilder.java       |  3 ---
 .../runtime/tasks/SubtaskCheckpointCoordinatorTest.java      |  2 --
 .../runtime/tasks/TestSubtaskCheckpointCoordinator.java      |  3 +++
 6 files changed, 13 insertions(+), 13 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index f431fe90399..add4174135d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -455,7 +455,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                             checkpointStorageAccess,
                             getName(),
                             actionExecutor,
-                            getCancelables(),
                             getAsyncOperationsThreadPool(),
                             environment,
                             this,
@@ -468,6 +467,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                             this::prepareInputSnapshot,
                             BarrierAlignmentUtil.createRegisterTimerCallback(
                                     mainMailboxExecutor, systemTimerService));
+            
resourceCloser.registerCloseable(subtaskCheckpointCoordinator::close);
 
             // Register to stop all timers and threads. Should be closed first.
             resourceCloser.registerCloseable(this::tryShutdownTimerService);
@@ -971,6 +971,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                 // can be invoked from a different thread
                                 mailboxProcessor.allActionsCompleted();
                                 try {
+                                    subtaskCheckpointCoordinator.cancel();
                                     cancelables.close();
                                 } catch (IOException e) {
                                     throw new CompletionException(e);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
index 0e6f5f2f3ec..1202fa73ad6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
@@ -100,4 +100,7 @@ public interface SubtaskCheckpointCoordinator extends 
Closeable {
 
     /** Waits for all the pending checkpoints to finish their asynchronous 
step. */
     void waitForPendingCheckpoints() throws Exception;
+
+    /** Cancel all resources. */
+    void cancel() throws IOException;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 76f413c4b49..48698a2c13e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
@@ -126,7 +125,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             CheckpointStorageWorkerView checkpointStorage,
             String taskName,
             StreamTaskActionExecutor actionExecutor,
-            CloseableRegistry closeableRegistry,
             ExecutorService asyncOperationsThreadPool,
             Environment env,
             AsyncExceptionHandler asyncExceptionHandler,
@@ -141,7 +139,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                 checkpointStorage,
                 taskName,
                 actionExecutor,
-                closeableRegistry,
                 asyncOperationsThreadPool,
                 env,
                 asyncExceptionHandler,
@@ -156,7 +153,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             CheckpointStorageWorkerView checkpointStorage,
             String taskName,
             StreamTaskActionExecutor actionExecutor,
-            CloseableRegistry closeableRegistry,
             ExecutorService asyncOperationsThreadPool,
             Environment env,
             AsyncExceptionHandler asyncExceptionHandler,
@@ -172,7 +168,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                 checkpointStorage,
                 taskName,
                 actionExecutor,
-                closeableRegistry,
                 asyncOperationsThreadPool,
                 env,
                 asyncExceptionHandler,
@@ -190,7 +185,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             CheckpointStorageWorkerView checkpointStorage,
             String taskName,
             StreamTaskActionExecutor actionExecutor,
-            CloseableRegistry closeableRegistry,
             ExecutorService asyncOperationsThreadPool,
             Environment env,
             AsyncExceptionHandler asyncExceptionHandler,
@@ -216,7 +210,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         this.abortedCheckpointIds =
                 
createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
         this.lastCheckpointId = -1L;
-        closeableRegistry.registerCloseable(this);
         this.closed = false;
         this.enableCheckpointAfterTasksFinished = 
enableCheckpointAfterTasksFinished;
         this.registerTimer = registerTimer;
@@ -550,6 +543,11 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
 
     @Override
     public void close() throws IOException {
+        cancelAlignmentTimer();
+        cancel();
+    }
+
+    public void cancel() throws IOException {
         List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
         synchronized (lock) {
             if (!closed) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
index 1855df80cbe..2c1af6fb1ed 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.execution.Environment;
@@ -43,7 +42,6 @@ public class MockSubtaskCheckpointCoordinatorBuilder {
     private Environment environment;
     private AsyncExceptionHandler asyncExceptionHandler;
     private StreamTaskActionExecutor actionExecutor = IMMEDIATE;
-    private CloseableRegistry closeableRegistry = new CloseableRegistry();
     private ExecutorService executorService = 
Executors.newDirectExecutorService();
     private BiFunctionWithException<
                     ChannelStateWriter, Long, CompletableFuture<Void>, 
CheckpointException>
@@ -105,7 +103,6 @@ public class MockSubtaskCheckpointCoordinatorBuilder {
                 checkpointStorage,
                 taskName,
                 actionExecutor,
-                closeableRegistry,
                 executorService,
                 environment,
                 asyncExceptionHandler,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 135799b8e44..3afd9ac2e81 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
@@ -737,7 +736,6 @@ public class SubtaskCheckpointCoordinatorTest {
                 new TestCheckpointStorageWorkerView(100),
                 "test",
                 StreamTaskActionExecutor.IMMEDIATE,
-                new CloseableRegistry(),
                 newDirectExecutorService(),
                 new DummyEnvironment(),
                 (message, unused) -> fail(message),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
index 8718de812c6..a43836515de 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
@@ -99,4 +99,7 @@ public class TestSubtaskCheckpointCoordinator implements 
SubtaskCheckpointCoordi
 
     @Override
     public void close() {}
+
+    @Override
+    public void cancel() {}
 }

Reply via email to