Hello. I haven't seen this behavior. But at a glimpse, die() method should ensure that subprocess will be destroyed (with try-finally), because worker process is going to be killed whether reportError() throws RuntimeException or not.
You can apply my suggestion to verify it works, and file a JIRA. Hope this helps. Regards. Jungtaek Lim (HeartSaVioR) 2015-02-13 5:08 GMT+09:00 William Oberman <[email protected]>: > Sorry for the cross post to dev, but I think this thread has veered into > actual dev questions. I still don't know if there is something > fundamentally wrong about my use case, or if this is a bug. For a dev > reading this for the first time, the main correction I'd make is to my > email subject. reportError isn't hanging, it's throwing a runtime > exception (wrapping an interrupted exception). As for what is throwing the > interrupted exception, I think it's Zookeeper itself. > > Both ShellSpout and ShellBolt's die() has a > "_collector.reportError(exception);" line. I changed both to: > ==== > try { > _collector.reportError(exception); > } catch (RuntimeException e) { > if(e.getCause() instanceof InterruptedException) { > //zookeeper.clj wraps zk InterruptedException with runtime > exception > } else { > throw e; > } > } > ====== > and now everything starts to work as I expected. > > Does this patch make any sense? Or is it a bandaid over a deeper issue? > > will > > On Thu, Feb 12, 2015 at 2:15 PM, William Oberman <[email protected] > > > wrote: > > > Ok, I realized that I did NOT check if ShellSpout.die() was throwing a > > RuntimeException. I added a try/catch block, and it is! The > > RuntimeException is preventing _process.destroy and System.exit() from > > happening, both of which need to happen to make topology recovery happen. > > > > But, I'm not sure *why* this exception is happening yet, since it's an > > interrupted exception and I don't think the exception tells me *who* > > interrupted my thread... > > > > 2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception! > > java.lang.RuntimeException: java.lang.InterruptedException > > at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235) > > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42) > > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261) > > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > > [na:1.7.0_71] > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > > [na:1.7.0_71] > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > > [na:1.7.0_71] > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > [na:1.7.0_71] > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > [na:1.7.0_71] > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > [na:1.7.0_71] > > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] > > Caused by: java.lang.InterruptedException: null > > at java.lang.Object.wait(Native Method) ~[na:1.7.0_71] > > at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71] > > at > > org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > at > > > backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101) > > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] > > ... 17 common frames omitted > > > > > > On Wed, Feb 11, 2015 at 4:27 PM, William Oberman < > [email protected] > > > wrote: > > > >> I'm not sure what I've learned adds up to yet.... > >> > >> I tried setting up a local storm development environment. By mistake, I > >> forgot to switch to 0.9.3-branch (e.g. I was working on master at > first). > >> In master, I was seeing the ShellBolt heartbeat fail first (which in > >> retrospect makes sense!). Then I remembered I was trying to debug > 0.9.3, > >> so I switched to 0.9.3-branch. In the branch, I see ShellSpout > heartbeat > >> fail first and jam up, so that's good (?). In both cases, I was trying > >> local mode rather than cluster mode, as I figured I had a better shot at > >> debugging in local mode. > >> > >> At this point, I figured I had a good test case. This was all using > >> command line tools (git, mvn) so far. Two thoughts at this point: > 1.) I > >> don't plan on running master, and it's not 100% clear to me if ShellBolt > >> failing first will solve any problems... 2.) in 0.9.3-branch, after > >> ShellSpout fails and everything jams up if I "ctl-c" the process I > >> immediately see the ShellBolt heartbeat timeout message. It's like it > was > >> waiting to write, but was blocked on something (hrmmmm....?) > >> > >> But, I hit the limits of my ability to add "System.out" debugging, so I > >> tried setting up storm in IntelliJ. That took a bit, but I finally > figured > >> out how to run a topology in local mode with a debugger. Once again, I > was > >> seeing ShellSpout heartbeat fail and then nothing happen. > >> > >> The next problem is that I don't know how to setup IntelliJ to > understand > >> clojure compiled code (at least, I think that's my problem....) so the > >> "Step Into/Out of" information is really weird in the debugger. The > >> best/most complete stack trace I have is: > >> invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm) > >> invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm) > >> invoke():114, zookeeper$mkdirs (backtype.storm) > >> mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526 > >> (backtype.storm) > >> report_error():397, cluster$mk_storm_cluster_state$reify__3983 > >> (backtype.storm) > >> invoke():180, executor$throttled_report_error_fn$fn__5548 > >> (backtype.storm.daemon) > >> reportError():533, executor$fn__5700$fn$reify__5742 > >> (backtype.storm.daemon) > >> reportError():132, SpoutOutputCollector (backtype.storm.spout) > >> > >> I had a better stack trace (that I lost) that lead into: > >> org.apache.storm.curator.RetryLoop.callWithRetry() > >> which for me is my "prime suspect" (based on name alone) for something > >> that is blocking things up.... :-) > >> > >> Though, once again, not understanding the big picture of storm, I have > no > >> idea what all of the above adds up to in terms of what's wrong, and how > to > >> fix it still.... > >> > >> will > >> > >> On Wed, Feb 11, 2015 at 1:31 PM, William Oberman < > >> [email protected]> wrote: > >> > >>> I'm glad to hear I'm not the only one! > >>> > >>> (no new news yet) > >>> > >>> > >>> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote: > >>> > >>>> Hi William, > >>>> > >>>> I'm having the same problem running a multilang topology (written in > >>>> python). If you find a solution, please post it here, it will sure > help us. > >>>> > >>>> To upgrade from 0.9.2-incubating we updated storm.py ( > >>>> > https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py > ) > >>>> and pom.xml. > >>>> > >>>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it > >>>> works like hell. > >>>> > >>>> Best regards, > >>>> > >>>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman < > >>>> [email protected]> wrote: > >>>> > >>>>> I'm not sure the best way to share a test case. I'll copy and paste > >>>>> code below.... If you run the below code (and find the worker that > was > >>>>> running it's log file), you should see in ~30 seconds: > >>>>> ==== > >>>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting > process: > >>>>> ShellSpout died. > >>>>> java.lang.RuntimeException: subprocess heartbeat timeout > >>>>> at > >>>>> > backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) > >>>>> [storm-core-0.9.3.jar:0.9.3] > >>>>> at > >>>>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > >>>>> [na:1.7.0_71] > >>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] > >>>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR] > >>>>> java.lang.RuntimeException: subprocess heartbeat timeout > >>>>> at > >>>>> > backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) > >>>>> [storm-core-0.9.3.jar:0.9.3] > >>>>> at > >>>>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > >>>>> [na:1.7.0_71] > >>>>> at > >>>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > >>>>> [na:1.7.0_71] > >>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] > >>>>> ======= > >>>>> > >>>>> But, the topology will run in a kind of weird zombie state forever. > >>>>> More specifically I see the multilang bolt process all tuples in the > >>>>> pending queue, and then an infinite loop of nextTuple()/fail() from > the > >>>>> multilang spout. But, as noted in my original email, if I comment > out: > >>>>> _collector.reportError(exception); > >>>>> in the Java ShellSpout then the worker will immediately die and > >>>>> respawn. > >>>>> > >>>>> If no one can help, the next step for me is rough, as I'll have to > >>>>> learn how to actually develop and debug storm itself, which is > usually at > >>>>> least 10x harder than just using something :-) > >>>>> > >>>>> In any case, my test code: > >>>>> > >>>>> Topology = 1 process with two tasks (multilang spout and bolt), and > >>>>> small pool of pending messages (yes, using the word count example in > >>>>> storm-starter as a starting point....) > >>>>> ============= > >>>>> public class SlowTopology { > >>>>> public static class SlowPhpBolt extends ShellBolt implements > >>>>> IRichBolt { > >>>>> > >>>>> public SlowPhpBolt() { > >>>>> super("php", "slowBolt.php"); > >>>>> } > >>>>> > >>>>> @Override > >>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { > >>>>> declarer.declare(new Fields()); > >>>>> } > >>>>> > >>>>> @Override > >>>>> public Map<String, Object> getComponentConfiguration() { > >>>>> return null; > >>>>> } > >>>>> } > >>>>> > >>>>> public static class SlowPhpSpout extends ShellSpout implements > >>>>> IRichSpout { > >>>>> > >>>>> public SlowPhpSpout() { > >>>>> super("php", "slowSpout.php"); > >>>>> } > >>>>> > >>>>> @Override > >>>>> public void declareOutputFields(OutputFieldsDeclarer ofd) { > >>>>> ofd.declare(new Fields("output")); > >>>>> } > >>>>> > >>>>> @Override > >>>>> public Map<String, Object> getComponentConfiguration() { > >>>>> return null; > >>>>> } > >>>>> } > >>>>> > >>>>> public static void main(String[] args) throws Exception { > >>>>> > >>>>> TopologyBuilder builder = new TopologyBuilder(); > >>>>> > >>>>> builder.setSpout("spout", new SlowPhpSpout(), > >>>>> 1).setNumTasks(1).setMaxSpoutPending(3); > >>>>> builder.setBolt("bolt", new SlowPhpBolt(), > >>>>> 1).setNumTasks(1).shuffleGrouping("spout"); > >>>>> > >>>>> Config conf = new Config(); > >>>>> conf.setDebug(true); > >>>>> > >>>>> if (args != null && args.length > 0) { > >>>>> conf.setNumWorkers(1); > >>>>> StormSubmitter.submitTopologyWithProgressBar(args[0], conf, > >>>>> builder.createTopology()); > >>>>> } > >>>>> else { > >>>>> LocalCluster cluster = new LocalCluster(); > >>>>> cluster.submitTopology("slow", conf, builder.createTopology()); > >>>>> Thread.sleep(10000); > >>>>> cluster.shutdown(); > >>>>> } > >>>>> } > >>>>> } > >>>>> =========== > >>>>> > >>>>> slowSpout.php > >>>>> ========== > >>>>> <?php > >>>>> require_once "storm.php"; > >>>>> class slowSpout extends \ShellSpout { > >>>>> protected function nextTuple() { > >>>>> $value = rand(0,100); > >>>>> $id = rand(0, 100); > >>>>> $this->emit(array($value), $id); > >>>>> file_put_contents("/tmp/storm_slow.log", > >>>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND); > >>>>> sleep(1); > >>>>> } > >>>>> protected function ack($id) { > >>>>> file_put_contents("/tmp/storm_slow.log", "ack($id)\n", > >>>>> FILE_APPEND); > >>>>> } > >>>>> > >>>>> protected function fail($id) { > >>>>> file_put_contents("/tmp/storm_slow.log", "fail($id)\n", > >>>>> FILE_APPEND); > >>>>> } > >>>>> } > >>>>> > >>>>> (new slowSpout())->run(); > >>>>> =========== > >>>>> > >>>>> slowBolt.php > >>>>> ============ > >>>>> <?php > >>>>> require_once "storm.php"; > >>>>> class slowBolt extends \BasicBolt { > >>>>> protected function process(\Tuple $t) { > >>>>> $sleep = rand(1, 180); > >>>>> file_put_contents("/tmp/storm_slow.log", "process(".print_r($t, > >>>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND); > >>>>> sleep($sleep); > >>>>> } > >>>>> } > >>>>> (new slowBolt())->run(); > >>>>> ============ > >>>>> > >>>>> storm.php (from https://github.com/lazyshot/storm-php, and I think > >>>>> I added more error checking on reads/writes to standard in/out, added > >>>>> sync() to the ShellSpout to make new classes easier to write, and > the new > >>>>> heartbeat protocol) > >>>>> ========= > >>>>> <?php > >>>>> interface iShellBolt { > >>>>> } > >>>>> > >>>>> interface iShellSpout { > >>>>> } > >>>>> > >>>>> class Tuple { > >>>>> public $id, $component, $stream, $task, $values; > >>>>> > >>>>> public function __construct($id, $component, $stream, $task, > >>>>> $values) { > >>>>> $this->id = $id; > >>>>> $this->component = $component; > >>>>> $this->stream = $stream; > >>>>> $this->task = $task; > >>>>> $this->values = $values; > >>>>> } > >>>>> } > >>>>> > >>>>> abstract class ShellComponent { > >>>>> protected $pid; > >>>>> protected $stormConf; > >>>>> protected $topologyContext; > >>>>> > >>>>> protected $stormInc = null; > >>>>> > >>>>> public function __construct() { > >>>>> $this->pid = getmypid(); > >>>>> $this->sendCommand(array("pid" => $this->pid)); > >>>>> > >>>>> $handshake = $this->parseMessage($this->waitForMessage()); > >>>>> > >>>>> $this->stormConf = $handshake['conf']; > >>>>> $this->topologyContext = $handshake['context']; > >>>>> $pidDir = $handshake['pidDir']; > >>>>> > >>>>> @fclose(@fopen($pidDir . "/" . $this->pid, "w")); > >>>>> } > >>>>> > >>>>> protected function readLine() { > >>>>> $raw = fgets(STDIN); > >>>>> > >>>>> if ($raw === false) { > >>>>> throw new Exception("STDIN is broken"); > >>>>> } > >>>>> > >>>>> $line = trim($raw); > >>>>> > >>>>> return $line; > >>>>> } > >>>>> > >>>>> protected function waitForMessage() { > >>>>> $message = ''; > >>>>> while (true) { > >>>>> $line = trim($this->readLine()); > >>>>> > >>>>> if (strlen($line) == 0) { > >>>>> continue; > >>>>> } else if ($line == 'end') { > >>>>> break; > >>>>> } else if ($line == 'sync') { > >>>>> $message = ''; > >>>>> continue; > >>>>> } > >>>>> > >>>>> $message .= $line . "\n"; > >>>>> } > >>>>> > >>>>> return trim($message); > >>>>> } > >>>>> > >>>>> protected function sendCommand(array $command) { > >>>>> $this->sendMessage(json_encode($command)); > >>>>> } > >>>>> > >>>>> protected function sendLog($message) { > >>>>> return $this->sendCommand(array( > >>>>> 'command' => 'log', > >>>>> 'msg' => $message > >>>>> )); > >>>>> } > >>>>> > >>>>> protected function parseMessage($message) { > >>>>> $msg = json_decode($message, true); > >>>>> > >>>>> if ($msg) { > >>>>> return $msg; > >>>>> } else { > >>>>> return $message; > >>>>> } > >>>>> } > >>>>> > >>>>> protected function sendMessage($message) { > >>>>> $message = "$message\nend\n"; > >>>>> $bytesWritten = fwrite(STDOUT, $message); > >>>>> fflush(STDOUT); > >>>>> if ($bytesWritten === false) { > >>>>> throw new Exception("STDOUT is broken"); > >>>>> } > >>>>> if ($bytesWritten != strlen($message)) { > >>>>> throw new Exception("Unable to write all bytes to STDOUT > >>>>> (message=$message, bytesWritten=$bytesWritten)"); > >>>>> } > >>>>> } > >>>>> > >>>>> final protected function sync() { > >>>>> $command = array( > >>>>> 'command' => 'sync', > >>>>> ); > >>>>> > >>>>> $this->sendCommand($command); > >>>>> } > >>>>> > >>>>> } > >>>>> > >>>>> abstract class ShellBolt extends ShellComponent implements > iShellBolt { > >>>>> > >>>>> public $anchor_tuple = null; > >>>>> > >>>>> public function __construct() { > >>>>> parent::__construct(); > >>>>> > >>>>> $this->init($this->stormConf, $this->topologyContext); > >>>>> } > >>>>> > >>>>> public function run() { > >>>>> try { > >>>>> while (true) { > >>>>> $command = > >>>>> $this->parseMessage($this->waitForMessage()); > >>>>> > >>>>> if (is_array($command)) { > >>>>> if (isset($command['tuple'])) { > >>>>> $tupleMap = array_merge(array( > >>>>> 'id' => null, > >>>>> 'comp' => null, > >>>>> 'stream' => null, > >>>>> 'task' => null, > >>>>> 'tuple' => null > >>>>> ), > >>>>> > >>>>> $command); > >>>>> > >>>>> if($tupleMap['task'] == -1 && > >>>>> $tupleMap['stream'] == "__heartbeat") { > >>>>> $this->sync(); > >>>>> } else { > >>>>> $tuple = new Tuple($tupleMap['id'], > >>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], > >>>>> $tupleMap['tuple']); > >>>>> $this->process($tuple); > >>>>> } > >>>>> } > >>>>> } > >>>>> } > >>>>> } catch (Exception $e) { > >>>>> $this->sendLog((string)$e); > >>>>> } > >>>>> } > >>>>> > >>>>> abstract protected function process(Tuple $tuple); > >>>>> > >>>>> protected function init($conf, $topology) { > >>>>> return; > >>>>> } > >>>>> > >>>>> protected function emitTuple(array $tuple, $stream = null, > >>>>> $anchors = array(), $directTask = null) { > >>>>> if ($this->anchor_tuple !== null) { > >>>>> $anchors = array($this->anchor_tuple); > >>>>> } > >>>>> > >>>>> $command = array( > >>>>> 'command' => 'emit' > >>>>> ); > >>>>> > >>>>> if ($stream !== null) { > >>>>> $command['stream'] = $stream; > >>>>> } > >>>>> > >>>>> $command['anchors'] = array_map(function ($a) { > >>>>> return $a->id; > >>>>> }, $anchors); > >>>>> > >>>>> if ($directTask !== null) { > >>>>> $command['task'] = $directTask; > >>>>> } > >>>>> > >>>>> $command['tuple'] = $tuple; > >>>>> > >>>>> $this->sendCommand($command); > >>>>> } > >>>>> > >>>>> protected function emit($tuple, $stream = null, $anchors = > >>>>> array()) { > >>>>> $this->emitTuple($tuple, $stream, $anchors); > >>>>> } > >>>>> > >>>>> protected function emitDirect($directTask, $tuple, $stream = > null, > >>>>> $anchors = array()) { > >>>>> $this->emitTuple($tuple, $stream, $anchors, $directTask); > >>>>> } > >>>>> > >>>>> protected function ack(Tuple $tuple) { > >>>>> $command = array( > >>>>> 'command' => 'ack', > >>>>> 'id' => $tuple->id > >>>>> ); > >>>>> > >>>>> $this->sendCommand($command); > >>>>> } > >>>>> > >>>>> protected function fail(Tuple $tuple) { > >>>>> $command = array( > >>>>> 'command' => 'fail', > >>>>> 'id' => $tuple->id > >>>>> ); > >>>>> > >>>>> $this->sendCommand($command); > >>>>> } > >>>>> } > >>>>> > >>>>> abstract class BasicBolt extends ShellBolt { > >>>>> public function run() { > >>>>> try { > >>>>> while (true) { > >>>>> $command = > >>>>> $this->parseMessage($this->waitForMessage()); > >>>>> > >>>>> if (is_array($command)) { > >>>>> if (isset($command['tuple'])) { > >>>>> $tupleMap = array_merge(array( > >>>>> 'id' => null, > >>>>> 'comp' => null, > >>>>> 'stream' => null, > >>>>> 'task' => null, > >>>>> 'tuple' => null > >>>>> ), > >>>>> > >>>>> $command); > >>>>> > >>>>> if($tupleMap['task'] == -1 && > >>>>> $tupleMap['stream'] == "__heartbeat") { > >>>>> $this->sync(); > >>>>> } else { > >>>>> $tuple = new Tuple($tupleMap['id'], > >>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], > >>>>> $tupleMap['tuple']); > >>>>> > >>>>> $this->anchor_tuple = $tuple; > >>>>> > >>>>> try { > >>>>> $processed = $this->process($tuple); > >>>>> > >>>>> $this->ack($tuple); > >>>>> } catch (BoltProcessException $e) { > >>>>> $this->fail($tuple); > >>>>> } > >>>>> } > >>>>> } > >>>>> } > >>>>> } > >>>>> } catch (Exception $e) { > >>>>> $this->sendLog((string)$e); > >>>>> } > >>>>> > >>>>> } > >>>>> } > >>>>> > >>>>> abstract class ShellSpout extends ShellComponent implements > >>>>> iShellSpout { > >>>>> protected $tuples = array(); > >>>>> > >>>>> public function __construct() { > >>>>> parent::__construct(); > >>>>> > >>>>> $this->init($this->stormConf, $this->topologyContext); > >>>>> } > >>>>> > >>>>> > >>>>> abstract protected function nextTuple(); > >>>>> > >>>>> abstract protected function ack($tuple_id); > >>>>> > >>>>> abstract protected function fail($tuple_id); > >>>>> > >>>>> public function run() { > >>>>> try { > >>>>> while (true) { > >>>>> $command = > >>>>> $this->parseMessage($this->waitForMessage()); > >>>>> > >>>>> if (is_array($command)) { > >>>>> if (isset($command['command'])) { > >>>>> if ($command['command'] == 'ack') { > >>>>> $this->ack($command['id']); > >>>>> $this->sync(); > >>>>> } else if ($command['command'] == 'fail') { > >>>>> $this->fail($command['id']); > >>>>> $this->sync(); > >>>>> } else if ($command['command'] == 'next') { > >>>>> $this->nextTuple(); > >>>>> $this->sync(); > >>>>> } > >>>>> } > >>>>> } > >>>>> } > >>>>> } catch (Exception $e) { > >>>>> $this->sendLog((string)$e); > >>>>> $this->sync(); > >>>>> } > >>>>> } > >>>>> > >>>>> protected function init($stormConf, $topologyContext) { > >>>>> return; > >>>>> } > >>>>> > >>>>> final protected function emit(array $tuple, $messageId = null, > >>>>> $streamId = null) { > >>>>> return $this->emitTuple($tuple, $messageId, $streamId, null); > >>>>> } > >>>>> > >>>>> final protected function emitDirect($directTask, array $tuple, > >>>>> $messageId = null, $streamId = null) { > >>>>> return $this->emitTuple($tuple, $messageId, $streamId, > >>>>> $directTask); > >>>>> } > >>>>> > >>>>> final private function emitTuple(array $tuple, $messageId = null, > >>>>> $streamId = null, $directTask = null) { > >>>>> $command = array( > >>>>> 'command' => 'emit' > >>>>> ); > >>>>> > >>>>> if ($messageId !== null) { > >>>>> $command['id'] = $messageId; > >>>>> } > >>>>> > >>>>> if ($streamId !== null) { > >>>>> $command['stream'] = $streamId; > >>>>> } > >>>>> > >>>>> if ($directTask !== null) { > >>>>> $command['task'] = $directTask; > >>>>> } > >>>>> > >>>>> $command['tuple'] = $tuple; > >>>>> > >>>>> return $this->sendCommand($command); > >>>>> } > >>>>> } > >>>>> > >>>>> class BoltProcessException extends Exception { > >>>>> } > >>>>> > >>>>> ========================= > >>>>> > >>>>> > >>>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman < > >>>>> [email protected]> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234. > >>>>>> > >>>>>> I'll try to cover the important facts that led to this issue: > >>>>>> > >>>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some > >>>>>> existing business logic > >>>>>> > >>>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat > addition > >>>>>> to the ShellBolt protocol) > >>>>>> > >>>>>> -I have some odd topologies where I try to do some legacy background > >>>>>> processing. This processing takes a highly variable amount time in > the > >>>>>> Bolts, from milliseconds to minutes. But, eventually due to > randomness the > >>>>>> spout's "pending" pool fills up, causing the spout to block on > nextTuple, > >>>>>> which eventually causes a heartbeat timeout. (I believe my only fix > is to > >>>>>> increase the heartbeat timeout at the topology level. that's not the > >>>>>> purpose of this email, though confirmation of this as my only > workaround > >>>>>> would be appreciated! I feel like this wasn't anticipated when the > >>>>>> heartbeat patch was designed, as it was assumed the spout's > nextTuple > >>>>>> wouldn't block I guess?) > >>>>>> > >>>>>> -The purpose of this email is the fact that the topology "jams up" > >>>>>> when the ShellSpout has a heartbeat timeout. I can see my PHP > spout/bolt > >>>>>> still running (I added logging to them), but Storm itself is doing > nothing. > >>>>>> > >>>>>> -I added logging to ShellSpout and recompiled, because I saw the log > >>>>>> message on like 233 (Halting process: ShellSpout died) but as noted > the PHP > >>>>>> process was still running, so I was curious if _process.destroy(); > failed. > >>>>>> But, my logging didn't appear. I assumed I was compiling/deploying > wrong. > >>>>>> Eventually I commented out line 234: > _collector.reportError(exception); > >>>>>> and everything started working!!! > >>>>>> > >>>>>> Does this make *any* sense? Why would > >>>>>> _collector.reportError(exception); block and never return (I waited > quite a > >>>>>> long time, 10's of minutes). When I comment out line 234, Storm > >>>>>> immediately kills my bad tasks and respawns almost instantly. > >>>>>> > >>>>>> I feel fairly confident that this will be recreatable. My topology: > >>>>>> -1 spout (ShellSpout) > >>>>>> -1 bolt (ShellBolt) > >>>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in > >>>>>> ShellBolt + the pending queue is full > >>>>>> > >>>>>> Thanks for any feedback! > >>>>>> > >>>>>> will > >>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> > >>>> > >>>> > >>>> -- > >>>> Alex Sobrino Beltrán > >>>> Registered Linux User #273657 > >>>> > >>>> http://v5tech.es > >>>> > >>> > >>> > >>> > >>> > >> > >> > > > > > > > -- Name : 임 정택 Blog : http://www.heartsavior.net / http://dev.heartsavior.net Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior
