Make heartbeat thread to daemon, pool size to 1

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b6fda451
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b6fda451
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b6fda451

Branch: refs/heads/master
Commit: b6fda45124d163ff51674c5f5b889dc69a712c5b
Parents: 31443dc
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Sun Oct 12 11:26:51 2014 +0900
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Sun Oct 12 11:26:51 2014 +0900

----------------------------------------------------------------------
 .../jvm/backtype/storm/spout/ShellSpout.java    | 15 ++++++-----
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 27 ++++++++------------
 2 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b6fda451/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java 
b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index a91a1ba..c79d175 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -27,13 +27,14 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
 import java.util.Map;
 import java.util.List;
-import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import clojure.lang.RT;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,7 @@ public class ShellSpout implements ISpout {
     private SpoutMsg _spoutMsg;
 
     private int workerTimeoutMills;
-    private ScheduledThreadPoolExecutor heartBeatExecutor;
+    private ScheduledExecutorService heartBeatExecutorService;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
 
     public ShellSpout(ShellComponent component) {
@@ -73,11 +74,11 @@ public class ShellSpout implements ISpout {
         Number subpid = _process.launch(stormConf, context);
         LOG.info("Launched subprocess with pid " + subpid);
 
-        heartBeatExecutor = new ScheduledThreadPoolExecutor(5);
+        heartBeatExecutorService = 
MoreExecutors.getExitingScheduledExecutorService(new 
ScheduledThreadPoolExecutor(1));
     }
 
     public void close() {
-        heartBeatExecutor.shutdownNow();
+        heartBeatExecutorService.shutdownNow();
         _process.destroy();
     }
 
@@ -210,12 +211,12 @@ public class ShellSpout implements ISpout {
         LOG.info("Start checking heartbeat...");
         // prevent timer to check heartbeat based on last thing before activate
         setHeartbeat();
-        heartBeatExecutor.scheduleAtFixedRate(new 
SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
+        heartBeatExecutorService.scheduleAtFixedRate(new 
SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
     }
 
     @Override
     public void deactivate() {
-        heartBeatExecutor.shutdownNow();
+        heartBeatExecutorService.shutdownNow();
     }
 
     private void setHeartbeat() {
@@ -227,7 +228,7 @@ public class ShellSpout implements ISpout {
     }
 
     private void die(Throwable exception) {
-        heartBeatExecutor.shutdownNow();
+        heartBeatExecutorService.shutdownNow();
 
         LOG.error("Halting process: ShellSpout died.", exception);
         _collector.reportError(exception);

http://git-wip-us.apache.org/repos/asf/storm/blob/b6fda451/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java 
b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 63ed21f..d52b8d9 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -22,27 +22,22 @@ import backtype.storm.Constants;
 import backtype.storm.generated.ShellComponent;
 import backtype.storm.metric.api.IMetric;
 import backtype.storm.metric.api.rpc.IShellMetric;
+import backtype.storm.multilang.BoltMsg;
+import backtype.storm.multilang.ShellMsg;
 import backtype.storm.topology.ReportedFailedException;
-import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
 import backtype.storm.utils.ShellProcess;
-import backtype.storm.multilang.BoltMsg;
-import backtype.storm.multilang.ShellMsg;
+import clojure.lang.RT;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 
-import clojure.lang.RT;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * A bolt that shells out to another process to process tuples. ShellBolt
  * communicates with that process over stdio using a special protocol. An ~100
@@ -90,7 +85,7 @@ public class ShellBolt implements IBolt {
     private TopologyContext _context;
 
     private int workerTimeoutMills;
-    private ScheduledThreadPoolExecutor heartBeatExecutor;
+    private ScheduledExecutorService heartBeatExecutorService;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
 
     public ShellBolt(ShellComponent component) {
@@ -127,8 +122,8 @@ public class ShellBolt implements IBolt {
         _writerThread = new Thread(new BoltWriterRunnable());
         _writerThread.start();
 
-        heartBeatExecutor = new ScheduledThreadPoolExecutor(5);
-        heartBeatExecutor.scheduleAtFixedRate(new 
BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
+        heartBeatExecutorService = 
MoreExecutors.getExitingScheduledExecutorService(new 
ScheduledThreadPoolExecutor(1));
+        heartBeatExecutorService.scheduleAtFixedRate(new 
BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
 
         LOG.info("Start checking heartbeat...");
         setHeartbeat();
@@ -164,7 +159,7 @@ public class ShellBolt implements IBolt {
 
     public void cleanup() {
         _running = false;
-        heartBeatExecutor.shutdownNow();
+        heartBeatExecutorService.shutdownNow();
         _writerThread.interrupt();
         _readerThread.interrupt();
         _process.destroy();

Reply via email to