This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
commit 60199bd7600a7d8b32cea59d5691c279e4032c0f Author: Stig Rohde Døssing <[email protected]> AuthorDate: Tue Mar 12 08:50:08 2019 +0100 STORM-3355: Use supervisor.worker.shutdown.sleep.secs to set worker suicide delay to allow users to configure how long they're willing to wait for orderly shutdown --- conf/defaults.yaml | 2 +- storm-client/src/jvm/org/apache/storm/Config.java | 7 +++++++ storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java | 4 +++- storm-client/src/jvm/org/apache/storm/task/IBolt.java | 6 ++++-- storm-server/src/main/java/org/apache/storm/DaemonConfig.java | 7 ------- .../src/main/java/org/apache/storm/daemon/supervisor/Slot.java | 2 +- .../main/java/org/apache/storm/daemon/supervisor/Supervisor.java | 2 +- 7 files changed, 17 insertions(+), 13 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index ec81d22..548f14c 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -161,7 +161,7 @@ supervisor.run.worker.as.user: false supervisor.worker.start.timeout.secs: 120 #how long between heartbeats until supervisor considers that worker dead and tries to restart it supervisor.worker.timeout.secs: 30 -#how many seconds to sleep for before shutting down threads on worker +#How many seconds to allow for graceful worker shutdown when killing workers before resorting to force kill supervisor.worker.shutdown.sleep.secs: 3 #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary supervisor.monitor.frequency.secs: 3 diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index a1aea4c..7c8d19e 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -1042,6 +1042,13 @@ public class Config extends HashMap<String, Object> { @NotNull public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs"; /** + * How many seconds to allow for graceful worker shutdown when killing workers before resorting to force kill. + * If a worker fails to shut down gracefully within this delay, it will either suicide or be forcibly killed by the supervisor. + */ + @isInteger + @isPositiveNumber + public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = "supervisor.worker.shutdown.sleep.secs"; + /** * A list of hosts of ZooKeeper servers used to manage the cluster. */ @isStringList diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 048425e..0e7b6cb 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -127,7 +127,9 @@ public class Worker implements Shutdownable, DaemonCommon { Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort), Integer.parseInt(portStr), workerId); worker.start(); - Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown); + int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); + LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs); + Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, workerShutdownSleepSecs); } public void start() throws Exception { diff --git a/storm-client/src/jvm/org/apache/storm/task/IBolt.java b/storm-client/src/jvm/org/apache/storm/task/IBolt.java index 21f1b8d..6e0d033 100644 --- a/storm-client/src/jvm/org/apache/storm/task/IBolt.java +++ b/storm-client/src/jvm/org/apache/storm/task/IBolt.java @@ -14,6 +14,7 @@ package org.apache.storm.task; import java.io.Serializable; import java.util.Map; +import org.apache.storm.Config; import org.apache.storm.tuple.Tuple; /** @@ -63,8 +64,9 @@ public interface IBolt extends Serializable { void execute(Tuple input); /** - * Called when an IBolt is going to be shutdown. There is no guarantee that cleanup will be called, because the supervisor kill -9's - * worker processes on the cluster. + * Called when an IBolt is going to be shutdown. Storm will make a best-effort attempt to call this if the worker shutdown is orderly. + * The {@link Config#SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS} setting controls how long orderly shutdown is allowed to take. + * There is no guarantee that cleanup will be called if shutdown is not orderly, or if the shutdown exceeds the time limit. * * The one context where cleanup is guaranteed to be called is when a topology is killed when running Storm in local mode. */ diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index ae013de..8015c1d 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -773,13 +773,6 @@ public class DaemonConfig implements Validated { public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts"; /** - * How many seconds to sleep for before shutting down threads on worker. - */ - @isInteger - @isPositiveNumber - public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = "supervisor.worker.shutdown.sleep.secs"; - - /** * How long a worker can go without heartbeating during the initial launch before the supervisor tries to restart the worker process. * This value override supervisor.worker.timeout.secs during launch because there is additional overhead to starting and configuring the * JVM on launch. diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index 74c3e3d..4607862 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -95,7 +95,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback this.staticState = new StaticState(localizer, ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000, ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000, - ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000, + ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000, ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000, containerLauncher, host, diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java index de46676..ed79c35 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java @@ -508,7 +508,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { LOG.error("Error trying to kill {}", workerId, e); } } - int shutdownSleepSecs = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS), 1); + int shutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); if (!containers.isEmpty()) { Time.sleepSecs(shutdownSleepSecs); }
