[ 
https://issues.apache.org/jira/browse/STORM-120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13955769#comment-13955769
 ] 

Patrick Lucas commented on STORM-120:
-------------------------------------

[~revans2]: I haven't had a chance yet to create a minimal reproducing 
topology, but here are examples of the two different tracebacks we regularly 
see in such a topology:

{code}
java.lang.RuntimeException: java.lang.NullPointerException
    at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
    at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
    at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
    at 
backtype.storm.disruptor$consume_loop_STAR_$fn__1577.invoke(disruptor.clj:89)
    at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
    at 
backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:41)
    at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__4217$fn__4221.invoke(worker.clj:123)
    at backtype.storm.util$fast_list_map.invoke(util.clj:832)
    at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__4217.invoke(worker.clj:123)
    at 
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3746.invoke(executor.clj:255)
    at 
backtype.storm.disruptor$clojure_handler$reify__1560.onEvent(disruptor.clj:58)
    at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104)
    ... 6 more
{code}

and

{code}
java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.IndexOutOfBoundsException: Index: 10, Size: 10
    at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
    at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
    at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
    at 
backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$fn__3978.invoke(executor.clj:745)
    at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 
Index: 10, Size: 10
    at backtype.storm.task.ShellBolt.execute(ShellBolt.java:164)
    at 
backtype.storm.daemon.executor$eval3918$fn__3919$tuple_action_fn__3921.invoke(executor.clj:630)
    at 
backtype.storm.daemon.executor$mk_task_receiver$fn__3839.invoke(executor.clj:398)
    at 
backtype.storm.disruptor$clojure_handler$reify__1560.onEvent(disruptor.clj:58)
    at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104)
    ... 6 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 10, Size: 10
    at java.util.ArrayList.RangeCheck(ArrayList.java:547)
    at java.util.ArrayList.get(ArrayList.java:322)
    at backtype.storm.util$acquire_random_range_id.invoke(util.clj:641)
    at 
backtype.storm.daemon.executor$mk_shuffle_grouper$fn__3601.invoke(executor.clj:44)
    at backtype.storm.daemon.task$mk_tasks_fn$fn__3372.invoke(task.clj:158)
    at 
backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$bolt_emit__3958.invoke(executor.clj:660)
    at 
backtype.storm.daemon.executor$eval3918$fn__3919$fn$reify__3964.emit(executor.clj:695)
    at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203)
    at backtype.storm.task.ShellBolt.handleEmit(ShellBolt.java:232)
    at backtype.storm.task.ShellBolt.access$500(ShellBolt.java:66)
    at backtype.storm.task.ShellBolt$1.run(ShellBolt.java:129)
    ... 1 more
{code}

This is happening to us in the following situation:
 * The upstream bolt has m executors and 2*m tasks
 * The downstream bolt has m executors and 2*m tasks
 * The downstream bolt uses a shuffle grouping from a non-default stream of the 
upstream bolt
 * When the upstream bolt has the same number of tasks as executors, this does 
not happen, even if the downstream bolt is m/2*m.

The traceback occurs where the upstream bolt chooses the downstream task to 
which to send a tuple.

> util/acquire-random-range-id is not thread-safe
> -----------------------------------------------
>
>                 Key: STORM-120
>                 URL: https://issues.apache.org/jira/browse/STORM-120
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: James Xu
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/724
> Concurrent calls to util/acquire-random-range-id with the same parameters can 
> result in an IndexOutOfBoundsException, as an increment in one thread may 
> occur after the bounds check in another. The resulting curr value can be >= 
> the size of the List state.
> https://github.com/nathanmarz/storm/blob/fc5fbb8b352cf91050cdde4a9f9e77e673ab7f48/storm-core/src/clj/backtype/storm/util.clj#L606



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to