Sorry for the cross post to dev, but I think this thread has veered into
actual dev questions. I still don't know if there is something
fundamentally wrong about my use case, or if this is a bug. For a dev
reading this for the first time, the main correction I'd make is to my
email subject. reportError isn't hanging, it's throwing a runtime
exception (wrapping an interrupted exception). As for what is throwing the
interrupted exception, I think it's Zookeeper itself.
Both ShellSpout and ShellBolt's die() has a
"_collector.reportError(exception);" line. I changed both to:
====
try {
_collector.reportError(exception);
} catch (RuntimeException e) {
if(e.getCause() instanceof InterruptedException) {
//zookeeper.clj wraps zk InterruptedException with runtime
exception
} else {
throw e;
}
}
======
and now everything starts to work as I expected.
Does this patch make any sense? Or is it a bandaid over a deeper issue?
will
On Thu, Feb 12, 2015 at 2:15 PM, William Oberman <[email protected]>
wrote:
> Ok, I realized that I did NOT check if ShellSpout.die() was throwing a
> RuntimeException. I added a try/catch block, and it is! The
> RuntimeException is preventing _process.destroy and System.exit() from
> happening, both of which need to happen to make topology recovery happen.
>
> But, I'm not sure *why* this exception is happening yet, since it's an
> interrupted exception and I don't think the exception tells me *who*
> interrupted my thread...
>
> 2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception!
> java.lang.RuntimeException: java.lang.InterruptedException
> at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
> [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_71]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_71]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_71]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_71]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_71]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_71]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> Caused by: java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[na:1.7.0_71]
> at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71]
> at
> org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> at
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101)
> ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> ... 17 common frames omitted
>
>
> On Wed, Feb 11, 2015 at 4:27 PM, William Oberman <[email protected]
> > wrote:
>
>> I'm not sure what I've learned adds up to yet....
>>
>> I tried setting up a local storm development environment. By mistake, I
>> forgot to switch to 0.9.3-branch (e.g. I was working on master at first).
>> In master, I was seeing the ShellBolt heartbeat fail first (which in
>> retrospect makes sense!). Then I remembered I was trying to debug 0.9.3,
>> so I switched to 0.9.3-branch. In the branch, I see ShellSpout heartbeat
>> fail first and jam up, so that's good (?). In both cases, I was trying
>> local mode rather than cluster mode, as I figured I had a better shot at
>> debugging in local mode.
>>
>> At this point, I figured I had a good test case. This was all using
>> command line tools (git, mvn) so far. Two thoughts at this point: 1.) I
>> don't plan on running master, and it's not 100% clear to me if ShellBolt
>> failing first will solve any problems... 2.) in 0.9.3-branch, after
>> ShellSpout fails and everything jams up if I "ctl-c" the process I
>> immediately see the ShellBolt heartbeat timeout message. It's like it was
>> waiting to write, but was blocked on something (hrmmmm....?)
>>
>> But, I hit the limits of my ability to add "System.out" debugging, so I
>> tried setting up storm in IntelliJ. That took a bit, but I finally figured
>> out how to run a topology in local mode with a debugger. Once again, I was
>> seeing ShellSpout heartbeat fail and then nothing happen.
>>
>> The next problem is that I don't know how to setup IntelliJ to understand
>> clojure compiled code (at least, I think that's my problem....) so the
>> "Step Into/Out of" information is really weird in the debugger. The
>> best/most complete stack trace I have is:
>> invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
>> invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
>> invoke():114, zookeeper$mkdirs (backtype.storm)
>> mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
>> (backtype.storm)
>> report_error():397, cluster$mk_storm_cluster_state$reify__3983
>> (backtype.storm)
>> invoke():180, executor$throttled_report_error_fn$fn__5548
>> (backtype.storm.daemon)
>> reportError():533, executor$fn__5700$fn$reify__5742
>> (backtype.storm.daemon)
>> reportError():132, SpoutOutputCollector (backtype.storm.spout)
>>
>> I had a better stack trace (that I lost) that lead into:
>> org.apache.storm.curator.RetryLoop.callWithRetry()
>> which for me is my "prime suspect" (based on name alone) for something
>> that is blocking things up.... :-)
>>
>> Though, once again, not understanding the big picture of storm, I have no
>> idea what all of the above adds up to in terms of what's wrong, and how to
>> fix it still....
>>
>> will
>>
>> On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <
>> [email protected]> wrote:
>>
>>> I'm glad to hear I'm not the only one!
>>>
>>> (no new news yet)
>>>
>>>
>>> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote:
>>>
>>>> Hi William,
>>>>
>>>> I'm having the same problem running a multilang topology (written in
>>>> python). If you find a solution, please post it here, it will sure help us.
>>>>
>>>> To upgrade from 0.9.2-incubating we updated storm.py (
>>>> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
>>>> and pom.xml.
>>>>
>>>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
>>>> works like hell.
>>>>
>>>> Best regards,
>>>>
>>>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
>>>> [email protected]> wrote:
>>>>
>>>>> I'm not sure the best way to share a test case. I'll copy and paste
>>>>> code below.... If you run the below code (and find the worker that was
>>>>> running it's log file), you should see in ~30 seconds:
>>>>> ====
>>>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>>>>> ShellSpout died.
>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>> at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> [na:1.7.0_71]
>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>> at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> [na:1.7.0_71]
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> [na:1.7.0_71]
>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>>> =======
>>>>>
>>>>> But, the topology will run in a kind of weird zombie state forever.
>>>>> More specifically I see the multilang bolt process all tuples in the
>>>>> pending queue, and then an infinite loop of nextTuple()/fail() from the
>>>>> multilang spout. But, as noted in my original email, if I comment out:
>>>>> _collector.reportError(exception);
>>>>> in the Java ShellSpout then the worker will immediately die and
>>>>> respawn.
>>>>>
>>>>> If no one can help, the next step for me is rough, as I'll have to
>>>>> learn how to actually develop and debug storm itself, which is usually at
>>>>> least 10x harder than just using something :-)
>>>>>
>>>>> In any case, my test code:
>>>>>
>>>>> Topology = 1 process with two tasks (multilang spout and bolt), and
>>>>> small pool of pending messages (yes, using the word count example in
>>>>> storm-starter as a starting point....)
>>>>> =============
>>>>> public class SlowTopology {
>>>>> public static class SlowPhpBolt extends ShellBolt implements
>>>>> IRichBolt {
>>>>>
>>>>> public SlowPhpBolt() {
>>>>> super("php", "slowBolt.php");
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>> declarer.declare(new Fields());
>>>>> }
>>>>>
>>>>> @Override
>>>>> public Map<String, Object> getComponentConfiguration() {
>>>>> return null;
>>>>> }
>>>>> }
>>>>>
>>>>> public static class SlowPhpSpout extends ShellSpout implements
>>>>> IRichSpout {
>>>>>
>>>>> public SlowPhpSpout() {
>>>>> super("php", "slowSpout.php");
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>> ofd.declare(new Fields("output"));
>>>>> }
>>>>>
>>>>> @Override
>>>>> public Map<String, Object> getComponentConfiguration() {
>>>>> return null;
>>>>> }
>>>>> }
>>>>>
>>>>> public static void main(String[] args) throws Exception {
>>>>>
>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>>
>>>>> builder.setSpout("spout", new SlowPhpSpout(),
>>>>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>>>> builder.setBolt("bolt", new SlowPhpBolt(),
>>>>> 1).setNumTasks(1).shuffleGrouping("spout");
>>>>>
>>>>> Config conf = new Config();
>>>>> conf.setDebug(true);
>>>>>
>>>>> if (args != null && args.length > 0) {
>>>>> conf.setNumWorkers(1);
>>>>> StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>>>>> builder.createTopology());
>>>>> }
>>>>> else {
>>>>> LocalCluster cluster = new LocalCluster();
>>>>> cluster.submitTopology("slow", conf, builder.createTopology());
>>>>> Thread.sleep(10000);
>>>>> cluster.shutdown();
>>>>> }
>>>>> }
>>>>> }
>>>>> ===========
>>>>>
>>>>> slowSpout.php
>>>>> ==========
>>>>> <?php
>>>>> require_once "storm.php";
>>>>> class slowSpout extends \ShellSpout {
>>>>> protected function nextTuple() {
>>>>> $value = rand(0,100);
>>>>> $id = rand(0, 100);
>>>>> $this->emit(array($value), $id);
>>>>> file_put_contents("/tmp/storm_slow.log",
>>>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND);
>>>>> sleep(1);
>>>>> }
>>>>> protected function ack($id) {
>>>>> file_put_contents("/tmp/storm_slow.log", "ack($id)\n",
>>>>> FILE_APPEND);
>>>>> }
>>>>>
>>>>> protected function fail($id) {
>>>>> file_put_contents("/tmp/storm_slow.log", "fail($id)\n",
>>>>> FILE_APPEND);
>>>>> }
>>>>> }
>>>>>
>>>>> (new slowSpout())->run();
>>>>> ===========
>>>>>
>>>>> slowBolt.php
>>>>> ============
>>>>> <?php
>>>>> require_once "storm.php";
>>>>> class slowBolt extends \BasicBolt {
>>>>> protected function process(\Tuple $t) {
>>>>> $sleep = rand(1, 180);
>>>>> file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>>>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>>>> sleep($sleep);
>>>>> }
>>>>> }
>>>>> (new slowBolt())->run();
>>>>> ============
>>>>>
>>>>> storm.php (from https://github.com/lazyshot/storm-php, and I think
>>>>> I added more error checking on reads/writes to standard in/out, added
>>>>> sync() to the ShellSpout to make new classes easier to write, and the new
>>>>> heartbeat protocol)
>>>>> =========
>>>>> <?php
>>>>> interface iShellBolt {
>>>>> }
>>>>>
>>>>> interface iShellSpout {
>>>>> }
>>>>>
>>>>> class Tuple {
>>>>> public $id, $component, $stream, $task, $values;
>>>>>
>>>>> public function __construct($id, $component, $stream, $task,
>>>>> $values) {
>>>>> $this->id = $id;
>>>>> $this->component = $component;
>>>>> $this->stream = $stream;
>>>>> $this->task = $task;
>>>>> $this->values = $values;
>>>>> }
>>>>> }
>>>>>
>>>>> abstract class ShellComponent {
>>>>> protected $pid;
>>>>> protected $stormConf;
>>>>> protected $topologyContext;
>>>>>
>>>>> protected $stormInc = null;
>>>>>
>>>>> public function __construct() {
>>>>> $this->pid = getmypid();
>>>>> $this->sendCommand(array("pid" => $this->pid));
>>>>>
>>>>> $handshake = $this->parseMessage($this->waitForMessage());
>>>>>
>>>>> $this->stormConf = $handshake['conf'];
>>>>> $this->topologyContext = $handshake['context'];
>>>>> $pidDir = $handshake['pidDir'];
>>>>>
>>>>> @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>>>> }
>>>>>
>>>>> protected function readLine() {
>>>>> $raw = fgets(STDIN);
>>>>>
>>>>> if ($raw === false) {
>>>>> throw new Exception("STDIN is broken");
>>>>> }
>>>>>
>>>>> $line = trim($raw);
>>>>>
>>>>> return $line;
>>>>> }
>>>>>
>>>>> protected function waitForMessage() {
>>>>> $message = '';
>>>>> while (true) {
>>>>> $line = trim($this->readLine());
>>>>>
>>>>> if (strlen($line) == 0) {
>>>>> continue;
>>>>> } else if ($line == 'end') {
>>>>> break;
>>>>> } else if ($line == 'sync') {
>>>>> $message = '';
>>>>> continue;
>>>>> }
>>>>>
>>>>> $message .= $line . "\n";
>>>>> }
>>>>>
>>>>> return trim($message);
>>>>> }
>>>>>
>>>>> protected function sendCommand(array $command) {
>>>>> $this->sendMessage(json_encode($command));
>>>>> }
>>>>>
>>>>> protected function sendLog($message) {
>>>>> return $this->sendCommand(array(
>>>>> 'command' => 'log',
>>>>> 'msg' => $message
>>>>> ));
>>>>> }
>>>>>
>>>>> protected function parseMessage($message) {
>>>>> $msg = json_decode($message, true);
>>>>>
>>>>> if ($msg) {
>>>>> return $msg;
>>>>> } else {
>>>>> return $message;
>>>>> }
>>>>> }
>>>>>
>>>>> protected function sendMessage($message) {
>>>>> $message = "$message\nend\n";
>>>>> $bytesWritten = fwrite(STDOUT, $message);
>>>>> fflush(STDOUT);
>>>>> if ($bytesWritten === false) {
>>>>> throw new Exception("STDOUT is broken");
>>>>> }
>>>>> if ($bytesWritten != strlen($message)) {
>>>>> throw new Exception("Unable to write all bytes to STDOUT
>>>>> (message=$message, bytesWritten=$bytesWritten)");
>>>>> }
>>>>> }
>>>>>
>>>>> final protected function sync() {
>>>>> $command = array(
>>>>> 'command' => 'sync',
>>>>> );
>>>>>
>>>>> $this->sendCommand($command);
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>>>>
>>>>> public $anchor_tuple = null;
>>>>>
>>>>> public function __construct() {
>>>>> parent::__construct();
>>>>>
>>>>> $this->init($this->stormConf, $this->topologyContext);
>>>>> }
>>>>>
>>>>> public function run() {
>>>>> try {
>>>>> while (true) {
>>>>> $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>> if (is_array($command)) {
>>>>> if (isset($command['tuple'])) {
>>>>> $tupleMap = array_merge(array(
>>>>> 'id' => null,
>>>>> 'comp' => null,
>>>>> 'stream' => null,
>>>>> 'task' => null,
>>>>> 'tuple' => null
>>>>> ),
>>>>>
>>>>> $command);
>>>>>
>>>>> if($tupleMap['task'] == -1 &&
>>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>> $this->sync();
>>>>> } else {
>>>>> $tuple = new Tuple($tupleMap['id'],
>>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>>> $tupleMap['tuple']);
>>>>> $this->process($tuple);
>>>>> }
>>>>> }
>>>>> }
>>>>> }
>>>>> } catch (Exception $e) {
>>>>> $this->sendLog((string)$e);
>>>>> }
>>>>> }
>>>>>
>>>>> abstract protected function process(Tuple $tuple);
>>>>>
>>>>> protected function init($conf, $topology) {
>>>>> return;
>>>>> }
>>>>>
>>>>> protected function emitTuple(array $tuple, $stream = null,
>>>>> $anchors = array(), $directTask = null) {
>>>>> if ($this->anchor_tuple !== null) {
>>>>> $anchors = array($this->anchor_tuple);
>>>>> }
>>>>>
>>>>> $command = array(
>>>>> 'command' => 'emit'
>>>>> );
>>>>>
>>>>> if ($stream !== null) {
>>>>> $command['stream'] = $stream;
>>>>> }
>>>>>
>>>>> $command['anchors'] = array_map(function ($a) {
>>>>> return $a->id;
>>>>> }, $anchors);
>>>>>
>>>>> if ($directTask !== null) {
>>>>> $command['task'] = $directTask;
>>>>> }
>>>>>
>>>>> $command['tuple'] = $tuple;
>>>>>
>>>>> $this->sendCommand($command);
>>>>> }
>>>>>
>>>>> protected function emit($tuple, $stream = null, $anchors =
>>>>> array()) {
>>>>> $this->emitTuple($tuple, $stream, $anchors);
>>>>> }
>>>>>
>>>>> protected function emitDirect($directTask, $tuple, $stream = null,
>>>>> $anchors = array()) {
>>>>> $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>>>> }
>>>>>
>>>>> protected function ack(Tuple $tuple) {
>>>>> $command = array(
>>>>> 'command' => 'ack',
>>>>> 'id' => $tuple->id
>>>>> );
>>>>>
>>>>> $this->sendCommand($command);
>>>>> }
>>>>>
>>>>> protected function fail(Tuple $tuple) {
>>>>> $command = array(
>>>>> 'command' => 'fail',
>>>>> 'id' => $tuple->id
>>>>> );
>>>>>
>>>>> $this->sendCommand($command);
>>>>> }
>>>>> }
>>>>>
>>>>> abstract class BasicBolt extends ShellBolt {
>>>>> public function run() {
>>>>> try {
>>>>> while (true) {
>>>>> $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>> if (is_array($command)) {
>>>>> if (isset($command['tuple'])) {
>>>>> $tupleMap = array_merge(array(
>>>>> 'id' => null,
>>>>> 'comp' => null,
>>>>> 'stream' => null,
>>>>> 'task' => null,
>>>>> 'tuple' => null
>>>>> ),
>>>>>
>>>>> $command);
>>>>>
>>>>> if($tupleMap['task'] == -1 &&
>>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>> $this->sync();
>>>>> } else {
>>>>> $tuple = new Tuple($tupleMap['id'],
>>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>>> $tupleMap['tuple']);
>>>>>
>>>>> $this->anchor_tuple = $tuple;
>>>>>
>>>>> try {
>>>>> $processed = $this->process($tuple);
>>>>>
>>>>> $this->ack($tuple);
>>>>> } catch (BoltProcessException $e) {
>>>>> $this->fail($tuple);
>>>>> }
>>>>> }
>>>>> }
>>>>> }
>>>>> }
>>>>> } catch (Exception $e) {
>>>>> $this->sendLog((string)$e);
>>>>> }
>>>>>
>>>>> }
>>>>> }
>>>>>
>>>>> abstract class ShellSpout extends ShellComponent implements
>>>>> iShellSpout {
>>>>> protected $tuples = array();
>>>>>
>>>>> public function __construct() {
>>>>> parent::__construct();
>>>>>
>>>>> $this->init($this->stormConf, $this->topologyContext);
>>>>> }
>>>>>
>>>>>
>>>>> abstract protected function nextTuple();
>>>>>
>>>>> abstract protected function ack($tuple_id);
>>>>>
>>>>> abstract protected function fail($tuple_id);
>>>>>
>>>>> public function run() {
>>>>> try {
>>>>> while (true) {
>>>>> $command =
>>>>> $this->parseMessage($this->waitForMessage());
>>>>>
>>>>> if (is_array($command)) {
>>>>> if (isset($command['command'])) {
>>>>> if ($command['command'] == 'ack') {
>>>>> $this->ack($command['id']);
>>>>> $this->sync();
>>>>> } else if ($command['command'] == 'fail') {
>>>>> $this->fail($command['id']);
>>>>> $this->sync();
>>>>> } else if ($command['command'] == 'next') {
>>>>> $this->nextTuple();
>>>>> $this->sync();
>>>>> }
>>>>> }
>>>>> }
>>>>> }
>>>>> } catch (Exception $e) {
>>>>> $this->sendLog((string)$e);
>>>>> $this->sync();
>>>>> }
>>>>> }
>>>>>
>>>>> protected function init($stormConf, $topologyContext) {
>>>>> return;
>>>>> }
>>>>>
>>>>> final protected function emit(array $tuple, $messageId = null,
>>>>> $streamId = null) {
>>>>> return $this->emitTuple($tuple, $messageId, $streamId, null);
>>>>> }
>>>>>
>>>>> final protected function emitDirect($directTask, array $tuple,
>>>>> $messageId = null, $streamId = null) {
>>>>> return $this->emitTuple($tuple, $messageId, $streamId,
>>>>> $directTask);
>>>>> }
>>>>>
>>>>> final private function emitTuple(array $tuple, $messageId = null,
>>>>> $streamId = null, $directTask = null) {
>>>>> $command = array(
>>>>> 'command' => 'emit'
>>>>> );
>>>>>
>>>>> if ($messageId !== null) {
>>>>> $command['id'] = $messageId;
>>>>> }
>>>>>
>>>>> if ($streamId !== null) {
>>>>> $command['stream'] = $streamId;
>>>>> }
>>>>>
>>>>> if ($directTask !== null) {
>>>>> $command['task'] = $directTask;
>>>>> }
>>>>>
>>>>> $command['tuple'] = $tuple;
>>>>>
>>>>> return $this->sendCommand($command);
>>>>> }
>>>>> }
>>>>>
>>>>> class BoltProcessException extends Exception {
>>>>> }
>>>>>
>>>>> =========================
>>>>>
>>>>>
>>>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>>>>
>>>>>> I'll try to cover the important facts that led to this issue:
>>>>>>
>>>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
>>>>>> existing business logic
>>>>>>
>>>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition
>>>>>> to the ShellBolt protocol)
>>>>>>
>>>>>> -I have some odd topologies where I try to do some legacy background
>>>>>> processing. This processing takes a highly variable amount time in the
>>>>>> Bolts, from milliseconds to minutes. But, eventually due to randomness
>>>>>> the
>>>>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>>>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>>>>> increase the heartbeat timeout at the topology level. that's not the
>>>>>> purpose of this email, though confirmation of this as my only workaround
>>>>>> would be appreciated! I feel like this wasn't anticipated when the
>>>>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>>>>> wouldn't block I guess?)
>>>>>>
>>>>>> -The purpose of this email is the fact that the topology "jams up"
>>>>>> when the ShellSpout has a heartbeat timeout. I can see my PHP spout/bolt
>>>>>> still running (I added logging to them), but Storm itself is doing
>>>>>> nothing.
>>>>>>
>>>>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>>>>> message on like 233 (Halting process: ShellSpout died) but as noted the
>>>>>> PHP
>>>>>> process was still running, so I was curious if _process.destroy();
>>>>>> failed.
>>>>>> But, my logging didn't appear. I assumed I was compiling/deploying
>>>>>> wrong.
>>>>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>>>> and everything started working!!!
>>>>>>
>>>>>> Does this make *any* sense? Why would
>>>>>> _collector.reportError(exception); block and never return (I waited
>>>>>> quite a
>>>>>> long time, 10's of minutes). When I comment out line 234, Storm
>>>>>> immediately kills my bad tasks and respawns almost instantly.
>>>>>>
>>>>>> I feel fairly confident that this will be recreatable. My topology:
>>>>>> -1 spout (ShellSpout)
>>>>>> -1 bolt (ShellBolt)
>>>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in
>>>>>> ShellBolt + the pending queue is full
>>>>>>
>>>>>> Thanks for any feedback!
>>>>>>
>>>>>> will
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Alex Sobrino Beltrán
>>>> Registered Linux User #273657
>>>>
>>>> http://v5tech.es
>>>>
>>>
>>>
>>>
>>>
>>
>>
>
>
>