dawidwys commented on a change in pull request #17701:
URL: https://github.com/apache/flink/pull/17701#discussion_r756213057



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -366,70 +368,100 @@ protected StreamTask(
             StreamTaskActionExecutor actionExecutor,
             TaskMailbox mailbox)
             throws Exception {
-        this.environment = environment;
-        this.configuration = new 
StreamConfig(environment.getTaskConfiguration());
-        this.recordWriter = createRecordWriterDelegate(configuration, 
environment);
-        this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
-        this.mailboxProcessor = new MailboxProcessor(this::processInput, 
mailbox, actionExecutor);
-        this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
-        this.asyncExceptionHandler = new 
StreamTaskAsyncExceptionHandler(environment);
-        this.asyncOperationsThreadPool =
-                Executors.newCachedThreadPool(
-                        new ExecutorThreadFactory("AsyncOperations", 
uncaughtExceptionHandler));
-
-        environment.setMainMailboxExecutor(mainMailboxExecutor);
-        environment.setAsyncOperationsThreadPool(asyncOperationsThreadPool);
-
-        this.stateBackend = createStateBackend();
-        this.checkpointStorage = createCheckpointStorage(stateBackend);
-        this.changelogWriterAvailabilityProvider =
-                environment.getTaskStateManager().getStateChangelogStorage() 
== null
-                        ? null
-                        : environment
-                                .getTaskStateManager()
-                                .getStateChangelogStorage()
-                                .getAvailabilityProvider();
-
-        CheckpointStorageAccess checkpointStorageAccess =
-                
checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
-
-        environment.setCheckpointStorageAccess(checkpointStorageAccess);
-
-        this.subtaskCheckpointCoordinator =
-                new SubtaskCheckpointCoordinatorImpl(
-                        checkpointStorageAccess,
-                        getName(),
-                        actionExecutor,
-                        getCancelables(),
-                        getAsyncOperationsThreadPool(),
-                        environment,
-                        this,
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        configuration
-                                .getConfiguration()
-                                .get(
-                                        ExecutionCheckpointingOptions
-                                                
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
-                        this::prepareInputSnapshot);
-
-        // if the clock is not already set, then assign a default 
TimeServiceProvider
-        if (timerService == null) {
-            this.timerService = createTimerService("Time Trigger for " + 
getName());
-        } else {
-            this.timerService = timerService;
-        }
+        // The registration of all closeable resources. The order of 
registration is important.
+        resourceCloser = new AutoCloseableRegistry();
+        try {
+            this.environment = environment;
+            this.configuration = new 
StreamConfig(environment.getTaskConfiguration());
+            this.mailboxProcessor =
+                    new MailboxProcessor(this::processInput, mailbox, 
actionExecutor);
+            // Should be closed last.
+            resourceCloser.registerCloseable(mailboxProcessor);
+
+            this.channelIOExecutor =
+                    Executors.newSingleThreadExecutor(
+                            new 
ExecutorThreadFactory("channel-state-unspilling"));
+            resourceCloser.registerCloseable(channelIOExecutor::shutdown);
+
+            this.recordWriter = createRecordWriterDelegate(configuration, 
environment);
+            // Release the output resources. this method should never fail.
+            resourceCloser.registerCloseable(this::releaseOutputResources);
+            // If the operators won't be closed explicitly, register it to a 
hard close.
+            resourceCloser.registerCloseable(this::closeAllOperators);
+            resourceCloser.registerCloseable(this::cleanUpInternal);
+
+            this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
+            this.mainMailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
+            this.asyncExceptionHandler = new 
StreamTaskAsyncExceptionHandler(environment);
+
+            this.asyncOperationsThreadPool =
+                    Executors.newCachedThreadPool(
+                            new ExecutorThreadFactory("AsyncOperations", 
uncaughtExceptionHandler));
+
+            // Register all asynchronous checkpoint threads.
+            resourceCloser.registerCloseable(this::shutdownAsyncThreads);
+            resourceCloser.registerCloseable(cancelables);
+
+            environment.setMainMailboxExecutor(mainMailboxExecutor);
+            
environment.setAsyncOperationsThreadPool(asyncOperationsThreadPool);
+
+            this.stateBackend = createStateBackend();
+            this.checkpointStorage = createCheckpointStorage(stateBackend);
+            this.changelogWriterAvailabilityProvider =
+                    
environment.getTaskStateManager().getStateChangelogStorage() == null
+                            ? null
+                            : environment
+                                    .getTaskStateManager()
+                                    .getStateChangelogStorage()
+                                    .getAvailabilityProvider();
+
+            CheckpointStorageAccess checkpointStorageAccess =
+                    
checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
+
+            environment.setCheckpointStorageAccess(checkpointStorageAccess);
+
+            this.subtaskCheckpointCoordinator =
+                    new SubtaskCheckpointCoordinatorImpl(
+                            checkpointStorageAccess,
+                            getName(),
+                            actionExecutor,
+                            getCancelables(),
+                            getAsyncOperationsThreadPool(),
+                            environment,
+                            this,
+                            configuration.isUnalignedCheckpointsEnabled(),
+                            configuration
+                                    .getConfiguration()
+                                    .get(
+                                            ExecutionCheckpointingOptions
+                                                    
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
+                            this::prepareInputSnapshot);
+
+            // if the clock is not already set, then assign a default 
TimeServiceProvider
+            if (timerService == null) {
+                this.timerService = createTimerService("Time Trigger for " + 
getName());
+            } else {
+                this.timerService = timerService;
+            }
 
-        this.systemTimerService = createTimerService("System Time Trigger for 
" + getName());
-        this.channelIOExecutor =
-                Executors.newSingleThreadExecutor(
-                        new ExecutorThreadFactory("channel-state-unspilling"));
+            this.systemTimerService = createTimerService("System Time Trigger 
for " + getName());
+            // Register to stop all timers and threads. Should be closed first.
+            resourceCloser.registerCloseable(this::tryShutdownTimerService);
 
-        injectChannelStateWriterIntoChannels();
+            injectChannelStateWriterIntoChannels();
 
-        
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
-        Configuration taskManagerConf = 
environment.getTaskManagerInfo().getConfiguration();
+            
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
+            Configuration taskManagerConf = 
environment.getTaskManagerInfo().getConfiguration();
 
-        this.bufferDebloatPeriod = 
taskManagerConf.get(BUFFER_DEBLOAT_PERIOD).toMillis();
+            this.bufferDebloatPeriod = 
taskManagerConf.get(BUFFER_DEBLOAT_PERIOD).toMillis();
+        } catch (Exception ex) {
+            try {
+                resourceCloser.close();

Review comment:
       Right, sorry, my comment was too rushed. :sweat_smile: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to