Make heartbeat thread to daemon, pool size to 1
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b6fda451 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b6fda451 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b6fda451 Branch: refs/heads/master Commit: b6fda45124d163ff51674c5f5b889dc69a712c5b Parents: 31443dc Author: Jungtaek Lim <kabh...@gmail.com> Authored: Sun Oct 12 11:26:51 2014 +0900 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Sun Oct 12 11:26:51 2014 +0900 ---------------------------------------------------------------------- .../jvm/backtype/storm/spout/ShellSpout.java | 15 ++++++----- .../src/jvm/backtype/storm/task/ShellBolt.java | 27 ++++++++------------ 2 files changed, 19 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b6fda451/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index a91a1ba..c79d175 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -27,13 +27,14 @@ import backtype.storm.task.TopologyContext; import backtype.storm.utils.ShellProcess; import java.util.Map; import java.util.List; -import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import clojure.lang.RT; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public class ShellSpout implements ISpout { private SpoutMsg _spoutMsg; private int workerTimeoutMills; - private ScheduledThreadPoolExecutor heartBeatExecutor; + private ScheduledExecutorService heartBeatExecutorService; private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); public ShellSpout(ShellComponent component) { @@ -73,11 +74,11 @@ public class ShellSpout implements ISpout { Number subpid = _process.launch(stormConf, context); LOG.info("Launched subprocess with pid " + subpid); - heartBeatExecutor = new ScheduledThreadPoolExecutor(5); + heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } public void close() { - heartBeatExecutor.shutdownNow(); + heartBeatExecutorService.shutdownNow(); _process.destroy(); } @@ -210,12 +211,12 @@ public class ShellSpout implements ISpout { LOG.info("Start checking heartbeat..."); // prevent timer to check heartbeat based on last thing before activate setHeartbeat(); - heartBeatExecutor.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); + heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); } @Override public void deactivate() { - heartBeatExecutor.shutdownNow(); + heartBeatExecutorService.shutdownNow(); } private void setHeartbeat() { @@ -227,7 +228,7 @@ public class ShellSpout implements ISpout { } private void die(Throwable exception) { - heartBeatExecutor.shutdownNow(); + heartBeatExecutorService.shutdownNow(); LOG.error("Halting process: ShellSpout died.", exception); _collector.reportError(exception); http://git-wip-us.apache.org/repos/asf/storm/blob/b6fda451/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 63ed21f..d52b8d9 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -22,27 +22,22 @@ import backtype.storm.Constants; import backtype.storm.generated.ShellComponent; import backtype.storm.metric.api.IMetric; import backtype.storm.metric.api.rpc.IShellMetric; +import backtype.storm.multilang.BoltMsg; +import backtype.storm.multilang.ShellMsg; import backtype.storm.topology.ReportedFailedException; -import backtype.storm.tuple.MessageId; import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImpl; import backtype.storm.utils.ShellProcess; -import backtype.storm.multilang.BoltMsg; -import backtype.storm.multilang.ShellMsg; +import clojure.lang.RT; +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import static java.util.concurrent.TimeUnit.SECONDS; -import clojure.lang.RT; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A bolt that shells out to another process to process tuples. ShellBolt * communicates with that process over stdio using a special protocol. An ~100 @@ -90,7 +85,7 @@ public class ShellBolt implements IBolt { private TopologyContext _context; private int workerTimeoutMills; - private ScheduledThreadPoolExecutor heartBeatExecutor; + private ScheduledExecutorService heartBeatExecutorService; private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); public ShellBolt(ShellComponent component) { @@ -127,8 +122,8 @@ public class ShellBolt implements IBolt { _writerThread = new Thread(new BoltWriterRunnable()); _writerThread.start(); - heartBeatExecutor = new ScheduledThreadPoolExecutor(5); - heartBeatExecutor.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); + heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); + heartBeatExecutorService.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); LOG.info("Start checking heartbeat..."); setHeartbeat(); @@ -164,7 +159,7 @@ public class ShellBolt implements IBolt { public void cleanup() { _running = false; - heartBeatExecutor.shutdownNow(); + heartBeatExecutorService.shutdownNow(); _writerThread.interrupt(); _readerThread.interrupt(); _process.destroy();