This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit 60199bd7600a7d8b32cea59d5691c279e4032c0f
Author: Stig Rohde Døssing <[email protected]>
AuthorDate: Tue Mar 12 08:50:08 2019 +0100

    STORM-3355: Use supervisor.worker.shutdown.sleep.secs to set worker suicide 
delay to allow users to configure how long they're willing to wait for orderly 
shutdown
---
 conf/defaults.yaml                                                 | 2 +-
 storm-client/src/jvm/org/apache/storm/Config.java                  | 7 +++++++
 storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java    | 4 +++-
 storm-client/src/jvm/org/apache/storm/task/IBolt.java              | 6 ++++--
 storm-server/src/main/java/org/apache/storm/DaemonConfig.java      | 7 -------
 .../src/main/java/org/apache/storm/daemon/supervisor/Slot.java     | 2 +-
 .../main/java/org/apache/storm/daemon/supervisor/Supervisor.java   | 2 +-
 7 files changed, 17 insertions(+), 13 deletions(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index ec81d22..548f14c 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -161,7 +161,7 @@ supervisor.run.worker.as.user: false
 supervisor.worker.start.timeout.secs: 120
 #how long between heartbeats until supervisor considers that worker dead and 
tries to restart it
 supervisor.worker.timeout.secs: 30
-#how many seconds to sleep for before shutting down threads on worker
+#How many seconds to allow for graceful worker shutdown when killing workers 
before resorting to force kill
 supervisor.worker.shutdown.sleep.secs: 3
 #how frequently the supervisor checks on the status of the processes it's 
monitoring and restarts if necessary
 supervisor.monitor.frequency.secs: 3
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index a1aea4c..7c8d19e 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1042,6 +1042,13 @@ public class Config extends HashMap<String, Object> {
     @NotNull
     public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = 
"supervisor.worker.timeout.secs";
     /**
+     * How many seconds to allow for graceful worker shutdown when killing 
workers before resorting to force kill.
+     * If a worker fails to shut down gracefully within this delay, it will 
either suicide or be forcibly killed by the supervisor.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = 
"supervisor.worker.shutdown.sleep.secs";
+    /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
     @isStringList
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 048425e..0e7b6cb 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -127,7 +127,9 @@ public class Worker implements Shutdownable, DaemonCommon {
         Worker worker = new Worker(conf, null, stormId, assignmentId, 
Integer.parseInt(supervisorPort),
                                    Integer.parseInt(portStr), workerId);
         worker.start();
-        Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
+        int workerShutdownSleepSecs = 
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
+        LOG.info("Adding shutdown hook with kill in {} secs", 
workerShutdownSleepSecs);
+        Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, 
workerShutdownSleepSecs);
     }
 
     public void start() throws Exception {
diff --git a/storm-client/src/jvm/org/apache/storm/task/IBolt.java 
b/storm-client/src/jvm/org/apache/storm/task/IBolt.java
index 21f1b8d..6e0d033 100644
--- a/storm-client/src/jvm/org/apache/storm/task/IBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/task/IBolt.java
@@ -14,6 +14,7 @@ package org.apache.storm.task;
 
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.storm.Config;
 import org.apache.storm.tuple.Tuple;
 
 /**
@@ -63,8 +64,9 @@ public interface IBolt extends Serializable {
     void execute(Tuple input);
 
     /**
-     * Called when an IBolt is going to be shutdown. There is no guarantee 
that cleanup will be called, because the supervisor kill -9's
-     * worker processes on the cluster.
+     * Called when an IBolt is going to be shutdown. Storm will make a 
best-effort attempt to call this if the worker shutdown is orderly.
+     * The {@link Config#SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS} setting 
controls how long orderly shutdown is allowed to take.
+     * There is no guarantee that cleanup will be called if shutdown is not 
orderly, or if the shutdown exceeds the time limit.
      *
      * The one context where cleanup is guaranteed to be called is when a 
topology is killed when running Storm in local mode.
      */
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java 
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index ae013de..8015c1d 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -773,13 +773,6 @@ public class DaemonConfig implements Validated {
     public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";
 
     /**
-     * How many seconds to sleep for before shutting down threads on worker.
-     */
-    @isInteger
-    @isPositiveNumber
-    public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = 
"supervisor.worker.shutdown.sleep.secs";
-
-    /**
      * How long a worker can go without heartbeating during the initial launch 
before the supervisor tries to restart the worker process.
      * This value override supervisor.worker.timeout.secs during launch 
because there is additional overhead to starting and configuring the
      * JVM on launch.
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 74c3e3d..4607862 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -95,7 +95,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         this.staticState = new StaticState(localizer,
                 
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
                 
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS))
 * 1000,
-                
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS))
 * 1000,
+                
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 
1000,
                 
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 
1000,
                 containerLauncher,
                 host,
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index de46676..ed79c35 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -508,7 +508,7 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
                 LOG.error("Error trying to kill {}", workerId, e);
             }
         }
-        int shutdownSleepSecs = 
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS),
 1);
+        int shutdownSleepSecs = 
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         if (!containers.isEmpty()) {
             Time.sleepSecs(shutdownSleepSecs);
         }

Reply via email to