This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.15.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.15.1-incubating by this push:
     new 43c300b  fix forking task runner task shutdown to be more graceful 
(#8085) (#8152)
43c300b is described below

commit 43c300b1fc4a4172a9878106a7705b85578ffad2
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Jul 29 14:09:09 2019 -0700

    fix forking task runner task shutdown to be more graceful (#8085) (#8152)
    
    * fix forking task runner shutdown to be more graceful
    
    * javadoc
---
 .../druid/indexing/overlord/ForkingTaskRunner.java | 42 +++++++++++++---------
 1 file changed, 25 insertions(+), 17 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 05f2f52..e0b9291 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -531,16 +531,7 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
 
     synchronized (tasks) {
       for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
-        if (taskWorkItem.processHolder != null) {
-          log.info("Closing output stream to task[%s].", 
taskWorkItem.getTask().getId());
-          try {
-            taskWorkItem.processHolder.process.getOutputStream().close();
-          }
-          catch (Exception e) {
-            log.warn(e, "Failed to close stdout to task[%s]. Destroying 
task.", taskWorkItem.getTask().getId());
-            taskWorkItem.processHolder.process.destroy();
-          }
-        }
+        shutdownTaskProcess(taskWorkItem);
       }
     }
 
@@ -597,12 +588,8 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
       }
 
       taskInfo.shutdown = true;
-    }
 
-    if (taskInfo.processHolder != null) {
-      // Will trigger normal failure mechanisms due to process exit
-      log.info("Killing process for task: %s", taskid);
-      taskInfo.processHolder.process.destroy();
+      shutdownTaskProcess(taskInfo);
     }
   }
 
@@ -698,8 +685,10 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
     );
   }
 
-  // Save running tasks to a file, so they can potentially be restored on next 
startup. Suppresses exceptions that
-  // occur while saving.
+  /**
+   * Save running tasks to a file, so they can potentially be restored on next 
startup. Suppresses exceptions that occur
+   * while saving.
+   */
   @GuardedBy("tasks")
   private void saveRunningTasks()
   {
@@ -718,6 +707,25 @@ public class ForkingTaskRunner implements TaskRunner, 
TaskLogStreamer
     }
   }
 
+  /**
+   * Close task output stream (input stream of process) sending EOF telling 
process to terminate, destroying the process
+   * if an exception is encountered.
+   */
+  private void shutdownTaskProcess(ForkingTaskRunnerWorkItem taskInfo)
+  {
+    if (taskInfo.processHolder != null) {
+      // Will trigger normal failure mechanisms due to process exit
+      log.info("Closing output stream to task[%s].", 
taskInfo.getTask().getId());
+      try {
+        taskInfo.processHolder.process.getOutputStream().close();
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", 
taskInfo.getTask().getId());
+        taskInfo.processHolder.process.destroy();
+      }
+    }
+  }
+
   private File getRestoreFile()
   {
     return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to