sv2000 commented on a change in pull request #2907: [GOBBLIN-1068]Clean up cyclic logic in task cancellation URL: https://github.com/apache/incubator-gobblin/pull/2907#discussion_r404225329
########## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ########## @@ -169,25 +174,35 @@ public void run() } } catch (InterruptedException interrupt) { log.info("Job interrupted by InterrupedException."); - interruptTaskExecution(countDownLatch); + interruptTaskExecution(Optional.of(countDownLatch)); } log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or("")); } - private void interruptTaskExecution(CountDownLatch countDownLatch) - throws InterruptedException { + /** + * A helper function that that shuts down all outstanding tasks and + * shuts down the taskExecutor if it times out on a task termination. + */ + private void interruptTaskExecution(Optional<CountDownLatch> countDownLatch) throws InterruptedException { log.info("Job interrupted. Attempting a graceful shutdown of the job."); - this.tasks.forEach(Task::shutdown); - if (!countDownLatch.await(5, TimeUnit.SECONDS)) { - log.warn("Graceful shutdown of job timed out. Killing all outstanding tasks."); - try { + this.shutdownTasks(); + try { + if (!countDownLatch.isPresent() || !countDownLatch.get().await(5, TimeUnit.SECONDS)) { + log.warn("Shutting down TaskExecutor. Killing all outstanding tasks."); this.taskExecutor.shutDown(); - } catch (Throwable t) { - throw new RuntimeException("Failed to shutdown task executor.", t); } + } catch (Exception e) { Review comment: Curious why we changed from Throwable to Exception here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services