This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e68d679e8aca9b8b17e1b667188545c915a137d0 Author: fanrui <[email protected]> AuthorDate: Tue May 24 18:06:58 2022 +0800 [FLINK-27251][checkpoint] Refactor the close() and cancel() of SubtaskCheckpointCoordinator --- .../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 3 ++- .../runtime/tasks/SubtaskCheckpointCoordinator.java | 3 +++ .../runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 12 +++++------- .../tasks/MockSubtaskCheckpointCoordinatorBuilder.java | 3 --- .../runtime/tasks/SubtaskCheckpointCoordinatorTest.java | 2 -- .../runtime/tasks/TestSubtaskCheckpointCoordinator.java | 3 +++ 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index f431fe90399..add4174135d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -455,7 +455,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> checkpointStorageAccess, getName(), actionExecutor, - getCancelables(), getAsyncOperationsThreadPool(), environment, this, @@ -468,6 +467,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> this::prepareInputSnapshot, BarrierAlignmentUtil.createRegisterTimerCallback( mainMailboxExecutor, systemTimerService)); + resourceCloser.registerCloseable(subtaskCheckpointCoordinator::close); // Register to stop all timers and threads. Should be closed first. resourceCloser.registerCloseable(this::tryShutdownTimerService); @@ -971,6 +971,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // can be invoked from a different thread mailboxProcessor.allActionsCompleted(); try { + subtaskCheckpointCoordinator.cancel(); cancelables.close(); } catch (IOException e) { throw new CompletionException(e); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java index 0e6f5f2f3ec..1202fa73ad6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java @@ -100,4 +100,7 @@ public interface SubtaskCheckpointCoordinator extends Closeable { /** Waits for all the pending checkpoints to finish their asynchronous step. */ void waitForPendingCheckpoints() throws Exception; + + /** Cancel all resources. */ + void cancel() throws IOException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 76f413c4b49..48698a2c13e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder; @@ -126,7 +125,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, - CloseableRegistry closeableRegistry, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, @@ -141,7 +139,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { checkpointStorage, taskName, actionExecutor, - closeableRegistry, asyncOperationsThreadPool, env, asyncExceptionHandler, @@ -156,7 +153,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, - CloseableRegistry closeableRegistry, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, @@ -172,7 +168,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { checkpointStorage, taskName, actionExecutor, - closeableRegistry, asyncOperationsThreadPool, env, asyncExceptionHandler, @@ -190,7 +185,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, - CloseableRegistry closeableRegistry, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, @@ -216,7 +210,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { this.abortedCheckpointIds = createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints); this.lastCheckpointId = -1L; - closeableRegistry.registerCloseable(this); this.closed = false; this.enableCheckpointAfterTasksFinished = enableCheckpointAfterTasksFinished; this.registerTimer = registerTimer; @@ -550,6 +543,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { @Override public void close() throws IOException { + cancelAlignmentTimer(); + cancel(); + } + + public void cancel() throws IOException { List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null; synchronized (lock) { if (!closed) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java index 1855df80cbe..2c1af6fb1ed 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.execution.Environment; @@ -43,7 +42,6 @@ public class MockSubtaskCheckpointCoordinatorBuilder { private Environment environment; private AsyncExceptionHandler asyncExceptionHandler; private StreamTaskActionExecutor actionExecutor = IMMEDIATE; - private CloseableRegistry closeableRegistry = new CloseableRegistry(); private ExecutorService executorService = Executors.newDirectExecutorService(); private BiFunctionWithException< ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> @@ -105,7 +103,6 @@ public class MockSubtaskCheckpointCoordinatorBuilder { checkpointStorage, taskName, actionExecutor, - closeableRegistry, executorService, environment, asyncExceptionHandler, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index 135799b8e44..3afd9ac2e81 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointException; @@ -737,7 +736,6 @@ public class SubtaskCheckpointCoordinatorTest { new TestCheckpointStorageWorkerView(100), "test", StreamTaskActionExecutor.IMMEDIATE, - new CloseableRegistry(), newDirectExecutorService(), new DummyEnvironment(), (message, unused) -> fail(message), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java index 8718de812c6..a43836515de 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java @@ -99,4 +99,7 @@ public class TestSubtaskCheckpointCoordinator implements SubtaskCheckpointCoordi @Override public void close() {} + + @Override + public void cancel() {} }
