@Evans, we are using storm version is 0.9.0.1,
https://github.com/apache/incubator-storm/tree/0.9.0.1

according the exception, it is thrown by KryoTupleSerializer's serialize
method, the code is :

    public byte[] serialize(Tuple tuple) {

        try {



            _kryoOut.clear();

            _kryoOut.writeInt(tuple.getSourceTask(), true);

            _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(),
tuple.getSourceStreamId()), true);

            tuple.getMessageId().serialize(_kryoOut);

            _kryo.serializeInto(tuple.getValues(), _kryoOut);

            return _kryoOut.toBytes();

        } catch (IOException e) {

            throw new RuntimeException(e);

        }

    }
It's like the tuple is null, so when called tuple.getSourceTask()
NullPointerException
is thrown.


thx


On Tue, Mar 11, 2014 at 1:59 AM, Bobby Evans <[email protected]> wrote:

> OutputCollector is not thread safe.  Most of it is but it does call down
> into some code, like the shuffle is not thread safe.  The simplest way to
> be sure that what you are doing is safe, is to synchronize access to the
> OutputCollector within your own code.  If you could send out the version of
> storm that you are using we can probably track down exactly what is
> happening here.
>
> —Bobby
>
> From: 鞠大升 <[email protected]<mailto:[email protected]>>
> Reply-To: "[email protected]<mailto:
> [email protected]>" <[email protected]<mailto:
> [email protected]>>
> Date: Monday, March 10, 2014 at 2:52 AM
> To: "[email protected]<mailto:
> [email protected]>" <[email protected]<mailto:
> [email protected]>>, "[email protected]<mailto:
> [email protected]>" <[email protected]<mailto:
> [email protected]>>
> Subject: Topology is hang when bolt "Async loop died" because
> KryoTupleSerializer.serialize throws NullPointerException
>
> hi, all
>
> Background:
>
> ---------------------------------------------------------------------------------------------------------------------
> we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from
> kafka),  ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout
> have 16 threads, ParserBolt have 32 threads,  SaverBolt have 16 threads.
> The ParserBolt is written in python using Multilang.
>
> Problems:
>
> ---------------------------------------------------------------------------------------------------------------------
> Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause
> the ParserBolt died。Then the supervisor will restart the bolts again, but
> the new bolt will never receive any tuples, and the topology is hang until
> we restart the topology.
>
> Analyse:
>
> ---------------------------------------------------------------------------------------------------------------------
> We found a TroubleShooting(
> https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm)
>  says:  This is caused by having multiple threads issue methods on the
> OutputCollector. All emits, acks, and fails must happen on the same thread.
> One subtle way this can happen is if you make a IBasicBolt that emits on a
> separate thread. IBasicBolt's automatically ack after execute is called, so
> this would cause multiple threads to use the OutputCollector leading to
> this exception. When using a basic bolt, all emits must happen in the same
> thread that runs execute.
> And we found in ShellBolt.java,the _readerThread is a new thread,
> handleEmit will call emit to emit new tuples.
>
> But another wiki(
> https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) says:  Its
> perfectly fine to launch new threads in bolts that do processing
> asynchronously. OutputCollector<
> http://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html>
> is thread-safe and can be called at any time.
>
> So we have questions:
>
> ---------------------------------------------------------------------------------------------------------------------
> 1) does OutputCollector is thread-safe or not?  if it is not thread-safe,
> then all emits, acks, and fails must happen on the same thread, does
> ShellBolt has a bug?
> 2) when the bolt is restart, why the topology is hang? by the way, we are
> using netty.
>
> anyone can help?
>
> the work.log:
>
> ---------------------------------------------------------------------------------------------------------------------
>
> 2014-03-10 12:48:31 b.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>
>         at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]
>
> Caused by: java.lang.NullPointerException: null
>
>         at
> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at backtype.storm.util$fast_list_map.invoke(util.clj:804)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0.1.jar:na]
>
>         ... 6 common frames omitted
>
>
>
> --
> dashengju
> +86 13810875910
> [email protected]<mailto:[email protected]>
>



-- 
dashengju
+86 13810875910
[email protected]

Reply via email to