Hello.

I haven't seen this behavior.
But at a glimpse, die() method should ensure that subprocess will be
destroyed (with try-finally), because worker process is going to be killed
whether reportError() throws RuntimeException or not.

You can apply my suggestion to verify it works, and file a JIRA.

Hope this helps.

Regards.
Jungtaek Lim (HeartSaVioR)

2015-02-13 5:08 GMT+09:00 William Oberman <[email protected]>:

> Sorry for the cross post to dev, but I think this thread has veered into
> actual dev questions.  I still don't know if there is something
> fundamentally wrong about my use case, or if this is a bug.  For a dev
> reading this for the first time, the main correction I'd make is to my
> email subject.  reportError isn't hanging, it's throwing a runtime
> exception (wrapping an interrupted exception).  As for what is throwing the
> interrupted exception, I think it's Zookeeper itself.
>
> Both ShellSpout and ShellBolt's die() has a
> "_collector.reportError(exception);" line.  I changed both to:
> ====
>         try {
>             _collector.reportError(exception);
>         } catch (RuntimeException e) {
>             if(e.getCause() instanceof InterruptedException) {
>                 //zookeeper.clj wraps zk InterruptedException with runtime
> exception
>             } else {
>                 throw e;
>             }
>         }
> ======
> and now everything starts to work as I expected.
>
> Does this patch make any sense?  Or is it a bandaid over a deeper issue?
>
> will
>
> On Thu, Feb 12, 2015 at 2:15 PM, William Oberman <[email protected]
> >
> wrote:
>
> > Ok, I realized that I did NOT check if ShellSpout.die() was throwing a
> > RuntimeException.   I added a try/catch block, and it is!   The
> > RuntimeException is preventing _process.destroy and System.exit() from
> > happening, both of which need to happen to make topology recovery happen.
> >
> > But, I'm not sure *why* this exception is happening yet, since it's an
> > interrupted exception and I don't think the exception tells me *who*
> > interrupted my thread...
> >
> > 2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception!
> > java.lang.RuntimeException: java.lang.InterruptedException
> > at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235)
> > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42)
> > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
> > [storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> > [na:1.7.0_71]
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> > [na:1.7.0_71]
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> > [na:1.7.0_71]
> > at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > [na:1.7.0_71]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > [na:1.7.0_71]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > [na:1.7.0_71]
> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> > Caused by: java.lang.InterruptedException: null
> > at java.lang.Object.wait(Native Method) ~[na:1.7.0_71]
> > at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71]
> > at
> > org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > at
> >
> backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101)
> > ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
> > ... 17 common frames omitted
> >
> >
> > On Wed, Feb 11, 2015 at 4:27 PM, William Oberman <
> [email protected]
> > > wrote:
> >
> >> I'm not sure what I've learned adds up to yet....
> >>
> >> I tried setting up a local storm development environment.  By mistake, I
> >> forgot to switch to 0.9.3-branch (e.g. I was working on master at
> first).
> >> In master, I was seeing the ShellBolt heartbeat fail first (which in
> >> retrospect makes sense!).  Then I remembered I was trying to debug
> 0.9.3,
> >> so I switched to 0.9.3-branch.  In the branch, I see ShellSpout
> heartbeat
> >> fail first and jam up, so that's good (?).  In both cases, I was trying
> >> local mode rather than cluster mode, as I figured I had a better shot at
> >> debugging in local mode.
> >>
> >> At this point, I figured I had a good test case.  This was all using
> >> command line tools (git, mvn) so far.   Two thoughts at this point:
> 1.) I
> >> don't plan on running master, and it's not 100% clear to me if ShellBolt
> >> failing first will solve any problems...  2.) in 0.9.3-branch, after
> >> ShellSpout fails and everything jams up if I "ctl-c" the process I
> >> immediately see the ShellBolt heartbeat timeout message.  It's like it
> was
> >> waiting to write, but was blocked on something (hrmmmm....?)
> >>
> >> But, I hit the limits of my ability to add "System.out" debugging, so I
> >> tried setting up storm in IntelliJ.  That took a bit, but I finally
> figured
> >> out how to run a topology in local mode with a debugger.  Once again, I
> was
> >> seeing ShellSpout heartbeat fail and then nothing happen.
> >>
> >> The next problem is that I don't know how to setup IntelliJ to
> understand
> >> clojure compiled code (at least, I think that's my problem....) so the
> >> "Step Into/Out of" information is really weird in the debugger.  The
> >> best/most complete stack trace I have is:
> >> invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
> >> invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
> >> invoke():114, zookeeper$mkdirs (backtype.storm)
> >> mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
> >> (backtype.storm)
> >> report_error():397, cluster$mk_storm_cluster_state$reify__3983
> >> (backtype.storm)
> >> invoke():180, executor$throttled_report_error_fn$fn__5548
> >> (backtype.storm.daemon)
> >> reportError():533, executor$fn__5700$fn$reify__5742
> >> (backtype.storm.daemon)
> >> reportError():132, SpoutOutputCollector (backtype.storm.spout)
> >>
> >> I had a better stack trace (that I lost) that lead into:
> >> org.apache.storm.curator.RetryLoop.callWithRetry()
> >> which for me is my "prime suspect" (based on name alone) for something
> >> that is blocking things up....  :-)
> >>
> >> Though, once again, not understanding the big picture of storm, I have
> no
> >> idea what all of the above adds up to in terms of what's wrong, and how
> to
> >> fix it still....
> >>
> >> will
> >>
> >> On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <
> >> [email protected]> wrote:
> >>
> >>> I'm glad to hear I'm not the only one!
> >>>
> >>> (no new news yet)
> >>>
> >>>
> >>> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote:
> >>>
> >>>> Hi William,
> >>>>
> >>>> I'm having the same problem running a multilang topology (written in
> >>>> python). If you find a solution, please post it here, it will sure
> help us.
> >>>>
> >>>> To upgrade from 0.9.2-incubating we updated storm.py (
> >>>>
> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py
> )
> >>>> and pom.xml.
> >>>>
> >>>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
> >>>> works like hell.
> >>>>
> >>>> Best regards,
> >>>>
> >>>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
> >>>> [email protected]> wrote:
> >>>>
> >>>>> I'm not sure the best way to share a test case.  I'll copy and paste
> >>>>> code below....  If you run the below code (and find the worker that
> was
> >>>>> running it's log file), you should see in ~30 seconds:
> >>>>> ====
> >>>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting
> process:
> >>>>> ShellSpout died.
> >>>>> java.lang.RuntimeException: subprocess heartbeat timeout
> >>>>>         at
> >>>>>
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
> >>>>> [storm-core-0.9.3.jar:0.9.3]
> >>>>>         at
> >>>>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >>>>> [na:1.7.0_71]
> >>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> >>>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
> >>>>> java.lang.RuntimeException: subprocess heartbeat timeout
> >>>>>         at
> >>>>>
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
> >>>>> [storm-core-0.9.3.jar:0.9.3]
> >>>>>         at
> >>>>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >>>>> [na:1.7.0_71]
> >>>>>         at
> >>>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >>>>> [na:1.7.0_71]
> >>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> >>>>> =======
> >>>>>
> >>>>> But, the topology will run in a kind of weird zombie state forever.
> >>>>> More specifically I see the multilang bolt process all tuples in the
> >>>>> pending queue, and then an infinite loop of nextTuple()/fail() from
> the
> >>>>> multilang spout.  But, as noted in my original email, if I comment
> out:
> >>>>>  _collector.reportError(exception);
> >>>>> in the Java ShellSpout then the worker will immediately die and
> >>>>> respawn.
> >>>>>
> >>>>> If no one can help, the next step for me is rough, as I'll have to
> >>>>> learn how to actually develop and debug storm itself, which is
> usually at
> >>>>> least 10x harder than just using something :-)
> >>>>>
> >>>>> In any case, my test code:
> >>>>>
> >>>>> Topology = 1 process with two tasks (multilang spout and bolt), and
> >>>>> small pool of pending messages (yes, using the word count example in
> >>>>> storm-starter as a starting point....)
> >>>>> =============
> >>>>> public class SlowTopology {
> >>>>>   public static class SlowPhpBolt extends ShellBolt implements
> >>>>> IRichBolt {
> >>>>>
> >>>>>     public SlowPhpBolt() {
> >>>>>       super("php", "slowBolt.php");
> >>>>>     }
> >>>>>
> >>>>>     @Override
> >>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>>>>       declarer.declare(new Fields());
> >>>>>     }
> >>>>>
> >>>>>     @Override
> >>>>>     public Map<String, Object> getComponentConfiguration() {
> >>>>>       return null;
> >>>>>     }
> >>>>>   }
> >>>>>
> >>>>>   public static class SlowPhpSpout extends ShellSpout implements
> >>>>> IRichSpout {
> >>>>>
> >>>>>       public SlowPhpSpout() {
> >>>>>           super("php", "slowSpout.php");
> >>>>>       }
> >>>>>
> >>>>>     @Override
> >>>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
> >>>>>         ofd.declare(new Fields("output"));
> >>>>>     }
> >>>>>
> >>>>>     @Override
> >>>>>     public Map<String, Object> getComponentConfiguration() {
> >>>>>         return null;
> >>>>>     }
> >>>>>   }
> >>>>>
> >>>>>   public static void main(String[] args) throws Exception {
> >>>>>
> >>>>>     TopologyBuilder builder = new TopologyBuilder();
> >>>>>
> >>>>>     builder.setSpout("spout", new SlowPhpSpout(),
> >>>>> 1).setNumTasks(1).setMaxSpoutPending(3);
> >>>>>     builder.setBolt("bolt", new SlowPhpBolt(),
> >>>>> 1).setNumTasks(1).shuffleGrouping("spout");
> >>>>>
> >>>>>     Config conf = new Config();
> >>>>>     conf.setDebug(true);
> >>>>>
> >>>>>     if (args != null && args.length > 0) {
> >>>>>       conf.setNumWorkers(1);
> >>>>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
> >>>>> builder.createTopology());
> >>>>>     }
> >>>>>     else {
> >>>>>       LocalCluster cluster = new LocalCluster();
> >>>>>       cluster.submitTopology("slow", conf, builder.createTopology());
> >>>>>       Thread.sleep(10000);
> >>>>>       cluster.shutdown();
> >>>>>     }
> >>>>>   }
> >>>>> }
> >>>>> ===========
> >>>>>
> >>>>> slowSpout.php
> >>>>> ==========
> >>>>> <?php
> >>>>> require_once "storm.php";
> >>>>> class slowSpout extends \ShellSpout {
> >>>>>   protected function nextTuple() {
> >>>>>     $value = rand(0,100);
> >>>>>     $id = rand(0, 100);
> >>>>>     $this->emit(array($value), $id);
> >>>>>     file_put_contents("/tmp/storm_slow.log",
> >>>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND);
> >>>>>     sleep(1);
> >>>>>   }
> >>>>>   protected function ack($id) {
> >>>>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n",
> >>>>> FILE_APPEND);
> >>>>>   }
> >>>>>
> >>>>>   protected function fail($id) {
> >>>>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n",
> >>>>> FILE_APPEND);
> >>>>>   }
> >>>>> }
> >>>>>
> >>>>> (new slowSpout())->run();
> >>>>> ===========
> >>>>>
> >>>>> slowBolt.php
> >>>>> ============
> >>>>> <?php
> >>>>> require_once "storm.php";
> >>>>> class slowBolt extends \BasicBolt {
> >>>>>   protected function process(\Tuple $t) {
> >>>>>     $sleep = rand(1, 180);
> >>>>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
> >>>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
> >>>>>     sleep($sleep);
> >>>>>   }
> >>>>> }
> >>>>> (new slowBolt())->run();
> >>>>> ============
> >>>>>
> >>>>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think
> >>>>> I added more error checking on reads/writes to standard in/out, added
> >>>>> sync() to the ShellSpout to make new classes easier to write, and
> the new
> >>>>> heartbeat protocol)
> >>>>> =========
> >>>>> <?php
> >>>>> interface iShellBolt {
> >>>>> }
> >>>>>
> >>>>> interface iShellSpout {
> >>>>> }
> >>>>>
> >>>>> class Tuple {
> >>>>>     public $id, $component, $stream, $task, $values;
> >>>>>
> >>>>>     public function __construct($id, $component, $stream, $task,
> >>>>> $values) {
> >>>>>         $this->id = $id;
> >>>>>         $this->component = $component;
> >>>>>         $this->stream = $stream;
> >>>>>         $this->task = $task;
> >>>>>         $this->values = $values;
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> abstract class ShellComponent {
> >>>>>     protected $pid;
> >>>>>     protected $stormConf;
> >>>>>     protected $topologyContext;
> >>>>>
> >>>>>     protected $stormInc = null;
> >>>>>
> >>>>>     public function __construct() {
> >>>>>         $this->pid = getmypid();
> >>>>>         $this->sendCommand(array("pid" => $this->pid));
> >>>>>
> >>>>>         $handshake = $this->parseMessage($this->waitForMessage());
> >>>>>
> >>>>>         $this->stormConf = $handshake['conf'];
> >>>>>         $this->topologyContext = $handshake['context'];
> >>>>>         $pidDir = $handshake['pidDir'];
> >>>>>
> >>>>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
> >>>>>     }
> >>>>>
> >>>>>     protected function readLine() {
> >>>>>         $raw = fgets(STDIN);
> >>>>>
> >>>>>         if ($raw === false) {
> >>>>>             throw new Exception("STDIN is broken");
> >>>>>         }
> >>>>>
> >>>>>         $line = trim($raw);
> >>>>>
> >>>>>         return $line;
> >>>>>     }
> >>>>>
> >>>>>     protected function waitForMessage() {
> >>>>>         $message = '';
> >>>>>         while (true) {
> >>>>>             $line = trim($this->readLine());
> >>>>>
> >>>>>             if (strlen($line) == 0) {
> >>>>>                 continue;
> >>>>>             } else if ($line == 'end') {
> >>>>>                 break;
> >>>>>             } else if ($line == 'sync') {
> >>>>>                 $message = '';
> >>>>>                 continue;
> >>>>>             }
> >>>>>
> >>>>>             $message .= $line . "\n";
> >>>>>         }
> >>>>>
> >>>>>         return trim($message);
> >>>>>     }
> >>>>>
> >>>>>     protected function sendCommand(array $command) {
> >>>>>         $this->sendMessage(json_encode($command));
> >>>>>     }
> >>>>>
> >>>>>     protected function sendLog($message) {
> >>>>>         return $this->sendCommand(array(
> >>>>>             'command' => 'log',
> >>>>>             'msg' => $message
> >>>>>         ));
> >>>>>     }
> >>>>>
> >>>>>     protected function parseMessage($message) {
> >>>>>         $msg = json_decode($message, true);
> >>>>>
> >>>>>         if ($msg) {
> >>>>>             return $msg;
> >>>>>         } else {
> >>>>>             return $message;
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>>     protected function sendMessage($message) {
> >>>>>         $message = "$message\nend\n";
> >>>>>         $bytesWritten = fwrite(STDOUT, $message);
> >>>>>         fflush(STDOUT);
> >>>>>         if ($bytesWritten === false) {
> >>>>>             throw new Exception("STDOUT is broken");
> >>>>>         }
> >>>>>         if ($bytesWritten != strlen($message)) {
> >>>>>             throw new Exception("Unable to write all bytes to STDOUT
> >>>>> (message=$message, bytesWritten=$bytesWritten)");
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>>     final protected function sync() {
> >>>>>         $command = array(
> >>>>>             'command' => 'sync',
> >>>>>         );
> >>>>>
> >>>>>         $this->sendCommand($command);
> >>>>>     }
> >>>>>
> >>>>> }
> >>>>>
> >>>>> abstract class ShellBolt extends ShellComponent implements
> iShellBolt {
> >>>>>
> >>>>>     public $anchor_tuple = null;
> >>>>>
> >>>>>     public function __construct() {
> >>>>>         parent::__construct();
> >>>>>
> >>>>>         $this->init($this->stormConf, $this->topologyContext);
> >>>>>     }
> >>>>>
> >>>>>     public function run() {
> >>>>>         try {
> >>>>>             while (true) {
> >>>>>                 $command =
> >>>>> $this->parseMessage($this->waitForMessage());
> >>>>>
> >>>>>                 if (is_array($command)) {
> >>>>>                     if (isset($command['tuple'])) {
> >>>>>                         $tupleMap = array_merge(array(
> >>>>>                                 'id' => null,
> >>>>>                                 'comp' => null,
> >>>>>                                 'stream' => null,
> >>>>>                                 'task' => null,
> >>>>>                                 'tuple' => null
> >>>>>                             ),
> >>>>>
> >>>>>                             $command);
> >>>>>
> >>>>>                         if($tupleMap['task'] == -1 &&
> >>>>> $tupleMap['stream'] == "__heartbeat") {
> >>>>>                             $this->sync();
> >>>>>                         } else {
> >>>>>                             $tuple = new Tuple($tupleMap['id'],
> >>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
> >>>>> $tupleMap['tuple']);
> >>>>>                             $this->process($tuple);
> >>>>>                         }
> >>>>>                     }
> >>>>>                 }
> >>>>>             }
> >>>>>         } catch (Exception $e) {
> >>>>>             $this->sendLog((string)$e);
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>>     abstract protected function process(Tuple $tuple);
> >>>>>
> >>>>>     protected function init($conf, $topology) {
> >>>>>         return;
> >>>>>     }
> >>>>>
> >>>>>     protected function emitTuple(array $tuple, $stream = null,
> >>>>> $anchors = array(), $directTask = null) {
> >>>>>         if ($this->anchor_tuple !== null) {
> >>>>>             $anchors = array($this->anchor_tuple);
> >>>>>         }
> >>>>>
> >>>>>         $command = array(
> >>>>>             'command' => 'emit'
> >>>>>         );
> >>>>>
> >>>>>         if ($stream !== null) {
> >>>>>             $command['stream'] = $stream;
> >>>>>         }
> >>>>>
> >>>>>         $command['anchors'] = array_map(function ($a) {
> >>>>>             return $a->id;
> >>>>>         }, $anchors);
> >>>>>
> >>>>>         if ($directTask !== null) {
> >>>>>             $command['task'] = $directTask;
> >>>>>         }
> >>>>>
> >>>>>         $command['tuple'] = $tuple;
> >>>>>
> >>>>>         $this->sendCommand($command);
> >>>>>     }
> >>>>>
> >>>>>     protected function emit($tuple, $stream = null, $anchors =
> >>>>> array()) {
> >>>>>         $this->emitTuple($tuple, $stream, $anchors);
> >>>>>     }
> >>>>>
> >>>>>     protected function emitDirect($directTask, $tuple, $stream =
> null,
> >>>>> $anchors = array()) {
> >>>>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
> >>>>>     }
> >>>>>
> >>>>>     protected function ack(Tuple $tuple) {
> >>>>>         $command = array(
> >>>>>             'command' => 'ack',
> >>>>>             'id' => $tuple->id
> >>>>>         );
> >>>>>
> >>>>>         $this->sendCommand($command);
> >>>>>     }
> >>>>>
> >>>>>     protected function fail(Tuple $tuple) {
> >>>>>         $command = array(
> >>>>>             'command' => 'fail',
> >>>>>             'id' => $tuple->id
> >>>>>         );
> >>>>>
> >>>>>         $this->sendCommand($command);
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> abstract class BasicBolt extends ShellBolt {
> >>>>>     public function run() {
> >>>>>         try {
> >>>>>             while (true) {
> >>>>>                 $command =
> >>>>> $this->parseMessage($this->waitForMessage());
> >>>>>
> >>>>>                 if (is_array($command)) {
> >>>>>                     if (isset($command['tuple'])) {
> >>>>>                         $tupleMap = array_merge(array(
> >>>>>                                 'id' => null,
> >>>>>                                 'comp' => null,
> >>>>>                                 'stream' => null,
> >>>>>                                 'task' => null,
> >>>>>                                 'tuple' => null
> >>>>>                             ),
> >>>>>
> >>>>>                             $command);
> >>>>>
> >>>>>                         if($tupleMap['task'] == -1 &&
> >>>>> $tupleMap['stream'] == "__heartbeat") {
> >>>>>                             $this->sync();
> >>>>>                         } else {
> >>>>>                             $tuple = new Tuple($tupleMap['id'],
> >>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
> >>>>> $tupleMap['tuple']);
> >>>>>
> >>>>>                             $this->anchor_tuple = $tuple;
> >>>>>
> >>>>>                             try {
> >>>>>                                 $processed = $this->process($tuple);
> >>>>>
> >>>>>                                 $this->ack($tuple);
> >>>>>                             } catch (BoltProcessException $e) {
> >>>>>                                 $this->fail($tuple);
> >>>>>                             }
> >>>>>                         }
> >>>>>                     }
> >>>>>                 }
> >>>>>             }
> >>>>>         } catch (Exception $e) {
> >>>>>             $this->sendLog((string)$e);
> >>>>>         }
> >>>>>
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> abstract class ShellSpout extends ShellComponent implements
> >>>>> iShellSpout {
> >>>>>     protected $tuples = array();
> >>>>>
> >>>>>     public function __construct() {
> >>>>>         parent::__construct();
> >>>>>
> >>>>>         $this->init($this->stormConf, $this->topologyContext);
> >>>>>     }
> >>>>>
> >>>>>
> >>>>>     abstract protected function nextTuple();
> >>>>>
> >>>>>     abstract protected function ack($tuple_id);
> >>>>>
> >>>>>     abstract protected function fail($tuple_id);
> >>>>>
> >>>>>     public function run() {
> >>>>>         try {
> >>>>>             while (true) {
> >>>>>                 $command =
> >>>>> $this->parseMessage($this->waitForMessage());
> >>>>>
> >>>>>                 if (is_array($command)) {
> >>>>>                     if (isset($command['command'])) {
> >>>>>                         if ($command['command'] == 'ack') {
> >>>>>                             $this->ack($command['id']);
> >>>>>                             $this->sync();
> >>>>>                         } else if ($command['command'] == 'fail') {
> >>>>>                             $this->fail($command['id']);
> >>>>>                             $this->sync();
> >>>>>                         } else if ($command['command'] == 'next') {
> >>>>>                             $this->nextTuple();
> >>>>>                             $this->sync();
> >>>>>                         }
> >>>>>                     }
> >>>>>                 }
> >>>>>             }
> >>>>>         } catch (Exception $e) {
> >>>>>             $this->sendLog((string)$e);
> >>>>>             $this->sync();
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>>     protected function init($stormConf, $topologyContext) {
> >>>>>         return;
> >>>>>     }
> >>>>>
> >>>>>     final protected function emit(array $tuple, $messageId = null,
> >>>>> $streamId = null) {
> >>>>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
> >>>>>     }
> >>>>>
> >>>>>     final protected function emitDirect($directTask, array $tuple,
> >>>>> $messageId = null, $streamId = null) {
> >>>>>         return $this->emitTuple($tuple, $messageId, $streamId,
> >>>>> $directTask);
> >>>>>     }
> >>>>>
> >>>>>     final private function emitTuple(array $tuple, $messageId = null,
> >>>>> $streamId = null, $directTask = null) {
> >>>>>         $command = array(
> >>>>>             'command' => 'emit'
> >>>>>         );
> >>>>>
> >>>>>         if ($messageId !== null) {
> >>>>>             $command['id'] = $messageId;
> >>>>>         }
> >>>>>
> >>>>>         if ($streamId !== null) {
> >>>>>             $command['stream'] = $streamId;
> >>>>>         }
> >>>>>
> >>>>>         if ($directTask !== null) {
> >>>>>             $command['task'] = $directTask;
> >>>>>         }
> >>>>>
> >>>>>         $command['tuple'] = $tuple;
> >>>>>
> >>>>>         return $this->sendCommand($command);
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>> class BoltProcessException extends Exception {
> >>>>> }
> >>>>>
> >>>>> =========================
> >>>>>
> >>>>>
> >>>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
> >>>>> [email protected]> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
> >>>>>>
> >>>>>> I'll try to cover the important facts that led to this issue:
> >>>>>>
> >>>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
> >>>>>> existing business logic
> >>>>>>
> >>>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat
> addition
> >>>>>> to the ShellBolt protocol)
> >>>>>>
> >>>>>> -I have some odd topologies where I try to do some legacy background
> >>>>>> processing.  This processing takes a highly variable amount time in
> the
> >>>>>> Bolts, from milliseconds to minutes.  But, eventually due to
> randomness the
> >>>>>> spout's "pending" pool fills up, causing the spout to block on
> nextTuple,
> >>>>>> which eventually causes a heartbeat timeout. (I believe my only fix
> is to
> >>>>>> increase the heartbeat timeout at the topology level. that's not the
> >>>>>> purpose of this email, though confirmation of this as my only
> workaround
> >>>>>> would be appreciated!  I feel like this wasn't anticipated when the
> >>>>>> heartbeat patch was designed, as it was assumed the spout's
> nextTuple
> >>>>>> wouldn't block I guess?)
> >>>>>>
> >>>>>> -The purpose of this email is the fact that the topology "jams up"
> >>>>>> when the ShellSpout has a heartbeat timeout.  I can see my PHP
> spout/bolt
> >>>>>> still running (I added logging to them), but Storm itself is doing
> nothing.
> >>>>>>
> >>>>>> -I added logging to ShellSpout and recompiled, because I saw the log
> >>>>>> message on like 233 (Halting process: ShellSpout died) but as noted
> the PHP
> >>>>>> process was still running, so I was curious if _process.destroy();
> failed.
> >>>>>> But, my logging didn't appear.  I assumed I was compiling/deploying
> wrong.
> >>>>>> Eventually I commented out line 234:
> _collector.reportError(exception);
> >>>>>>  and everything started working!!!
> >>>>>>
> >>>>>> Does this make *any* sense?  Why would
> >>>>>> _collector.reportError(exception); block and never return (I waited
> quite a
> >>>>>> long time, 10's of minutes).  When I comment out line 234, Storm
> >>>>>> immediately kills my bad tasks and respawns almost instantly.
> >>>>>>
> >>>>>> I feel fairly confident that this will be recreatable.  My topology:
> >>>>>> -1 spout (ShellSpout)
> >>>>>> -1 bolt (ShellBolt)
> >>>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in
> >>>>>> ShellBolt + the pending queue is full
> >>>>>>
> >>>>>> Thanks for any feedback!
> >>>>>>
> >>>>>> will
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Alex Sobrino Beltrán
> >>>> Registered Linux User #273657
> >>>>
> >>>> http://v5tech.es
> >>>>
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >
> >
> >
>



-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to