dawidwys commented on a change in pull request #17701:
URL: https://github.com/apache/flink/pull/17701#discussion_r756102896
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -366,70 +368,100 @@ protected StreamTask(
StreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox)
throws Exception {
- this.environment = environment;
- this.configuration = new
StreamConfig(environment.getTaskConfiguration());
- this.recordWriter = createRecordWriterDelegate(configuration,
environment);
- this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
- this.mailboxProcessor = new MailboxProcessor(this::processInput,
mailbox, actionExecutor);
- this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
- this.asyncExceptionHandler = new
StreamTaskAsyncExceptionHandler(environment);
- this.asyncOperationsThreadPool =
- Executors.newCachedThreadPool(
- new ExecutorThreadFactory("AsyncOperations",
uncaughtExceptionHandler));
-
- environment.setMainMailboxExecutor(mainMailboxExecutor);
- environment.setAsyncOperationsThreadPool(asyncOperationsThreadPool);
-
- this.stateBackend = createStateBackend();
- this.checkpointStorage = createCheckpointStorage(stateBackend);
- this.changelogWriterAvailabilityProvider =
- environment.getTaskStateManager().getStateChangelogStorage()
== null
- ? null
- : environment
- .getTaskStateManager()
- .getStateChangelogStorage()
- .getAvailabilityProvider();
-
- CheckpointStorageAccess checkpointStorageAccess =
-
checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
-
- environment.setCheckpointStorageAccess(checkpointStorageAccess);
-
- this.subtaskCheckpointCoordinator =
- new SubtaskCheckpointCoordinatorImpl(
- checkpointStorageAccess,
- getName(),
- actionExecutor,
- getCancelables(),
- getAsyncOperationsThreadPool(),
- environment,
- this,
- configuration.isUnalignedCheckpointsEnabled(),
- configuration
- .getConfiguration()
- .get(
- ExecutionCheckpointingOptions
-
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
- this::prepareInputSnapshot);
-
- // if the clock is not already set, then assign a default
TimeServiceProvider
- if (timerService == null) {
- this.timerService = createTimerService("Time Trigger for " +
getName());
- } else {
- this.timerService = timerService;
- }
+ // The registration of all closeable resources. The order of
registration is important.
+ resourceCloser = new AutoCloseableRegistry();
+ try {
+ this.environment = environment;
+ this.configuration = new
StreamConfig(environment.getTaskConfiguration());
+ this.mailboxProcessor =
+ new MailboxProcessor(this::processInput, mailbox,
actionExecutor);
+ // Should be closed last.
+ resourceCloser.registerCloseable(mailboxProcessor);
+
+ this.channelIOExecutor =
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("channel-state-unspilling"));
+ resourceCloser.registerCloseable(channelIOExecutor::shutdown);
+
+ this.recordWriter = createRecordWriterDelegate(configuration,
environment);
+ // Release the output resources. this method should never fail.
+ resourceCloser.registerCloseable(this::releaseOutputResources);
+ // If the operators won't be closed explicitly, register it to a
hard close.
+ resourceCloser.registerCloseable(this::closeAllOperators);
+ resourceCloser.registerCloseable(this::cleanUpInternal);
+
+ this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
+ this.mainMailboxExecutor =
mailboxProcessor.getMainMailboxExecutor();
+ this.asyncExceptionHandler = new
StreamTaskAsyncExceptionHandler(environment);
+
+ this.asyncOperationsThreadPool =
+ Executors.newCachedThreadPool(
+ new ExecutorThreadFactory("AsyncOperations",
uncaughtExceptionHandler));
+
+ // Register all asynchronous checkpoint threads.
+ resourceCloser.registerCloseable(this::shutdownAsyncThreads);
+ resourceCloser.registerCloseable(cancelables);
+
+ environment.setMainMailboxExecutor(mainMailboxExecutor);
+
environment.setAsyncOperationsThreadPool(asyncOperationsThreadPool);
+
+ this.stateBackend = createStateBackend();
+ this.checkpointStorage = createCheckpointStorage(stateBackend);
+ this.changelogWriterAvailabilityProvider =
+
environment.getTaskStateManager().getStateChangelogStorage() == null
+ ? null
+ : environment
+ .getTaskStateManager()
+ .getStateChangelogStorage()
+ .getAvailabilityProvider();
+
+ CheckpointStorageAccess checkpointStorageAccess =
+
checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
+
+ environment.setCheckpointStorageAccess(checkpointStorageAccess);
+
+ this.subtaskCheckpointCoordinator =
+ new SubtaskCheckpointCoordinatorImpl(
+ checkpointStorageAccess,
+ getName(),
+ actionExecutor,
+ getCancelables(),
+ getAsyncOperationsThreadPool(),
+ environment,
+ this,
+ configuration.isUnalignedCheckpointsEnabled(),
+ configuration
+ .getConfiguration()
+ .get(
+ ExecutionCheckpointingOptions
+
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
+ this::prepareInputSnapshot);
+
+ // if the clock is not already set, then assign a default
TimeServiceProvider
+ if (timerService == null) {
+ this.timerService = createTimerService("Time Trigger for " +
getName());
+ } else {
+ this.timerService = timerService;
+ }
- this.systemTimerService = createTimerService("System Time Trigger for
" + getName());
- this.channelIOExecutor =
- Executors.newSingleThreadExecutor(
- new ExecutorThreadFactory("channel-state-unspilling"));
+ this.systemTimerService = createTimerService("System Time Trigger
for " + getName());
+ // Register to stop all timers and threads. Should be closed first.
+ resourceCloser.registerCloseable(this::tryShutdownTimerService);
- injectChannelStateWriterIntoChannels();
+ injectChannelStateWriterIntoChannels();
-
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
- Configuration taskManagerConf =
environment.getTaskManagerInfo().getConfiguration();
+
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
+ Configuration taskManagerConf =
environment.getTaskManagerInfo().getConfiguration();
- this.bufferDebloatPeriod =
taskManagerConf.get(BUFFER_DEBLOAT_PERIOD).toMillis();
+ this.bufferDebloatPeriod =
taskManagerConf.get(BUFFER_DEBLOAT_PERIOD).toMillis();
+ } catch (Exception ex) {
+ try {
+ resourceCloser.close();
Review comment:
Given the change in `AutoCloseableRegistry#close`, how do you feel about
removing those catch blocks ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]