Hi,
Forwarding this issue to the dev forum since I thought this would be
more relevant to be discussed here.
Thanks,
Srinath.
---------- Forwarded message ----------
From: Srinath C <[email protected]>
Date: Tue, May 13, 2014 at 8:17 AM
Subject: Re: Peeking into storm's internal buffers
To: [email protected]
Here is more info...
My suspicion is that the queue
backtype.storm.messaging.netty.Server#message_queue is not getting consumed.
A heap dump of the worker process reveals that the size of that queue is
around 40k.
I have snippets of the worker log that I can share right now. Some thread
stacks are also attached:
my-spout is the name of the spout that emits tuple which will be
transferred to all the tasks of the bolt my-bolt.
my-bolt has 10 tasks numbered [43 52]. worker-2 has tasks 43, 45, 47, 49
and 51 while worker-1 has tasks 44, 46, 48, 50 and 52.
The logs below are related to the exchange between worker-1 and worker-2
when my-spout emits a tuple
worker-1
**********
2014-05-12 13:04:18 n.a.my-spout[111] [DEBUG] nextTuple : emitting tuple
[1399914258343]
2014-05-12 13:04:18 n.a.my-bolt[*50*] [DEBUG] execute : received a status
request with id 1399914258343
2014-05-12 13:04:18 n.a.my-bolt[*46*] [DEBUG] execute : received a status
request with id 1399914258343
2014-05-12 13:04:18 n.a.my-bolt[*48*] [DEBUG] execute : received a status
request with id 1399914258343
2014-05-12 13:04:18 b.s.m.n.StormClientHandler [DEBUG] 5 request(s) sent
2014-05-12 13:04:18 n.a.my-bolt[*52*] [DEBUG] execute : received a status
request with id 1399914258343
2014-05-12 13:04:18 n.a.my-bolt[*44*] [DEBUG] execute : received a status
request with id 1399914258343
2014-05-12 13:04:18 b.s.m.n.StormClientHandler [DEBUG] send/recv time (ms):
227523778
2014-05-12 13:04:18 b.s.d.worker [DEBUG] Doing heartbeat
#backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1399914258,
:storm-id "metrics-topology-1-1399685003", :executors #{[2 2] [66 66] [68
68] [38 38] [70 70] [40 40] [72 72] [42 42] [74 74] [44 44] [76 76] [46 46]
[78 78] [48 48] [80 80] [50 50] [82 82] [52 52] [84 84] [86 86] [94 94] [-1
-1] [64 64] [96 96] [98 102] [8 12] [108 112] [18 22] [88 92] [58 62] [28
32]}, :port 6700}
worker-2
***********
2014-05-12 13:04:17 b.s.d.worker [DEBUG] Doing heartbeat
#backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1399914257,
:storm-id "metrics-topology-1-1399685003", :executors #{[67 67] [69 69] [39
39] [71 71] [41 41] [73 73] [43 43] [75 75] [45 45] [77 77] [47 47] [79 79]
[49 49] [81 81] [51 51] [83 83] [85 85] [87 87] [93 93] [63 63] [95 95] [-1
-1] [1 1] [65 65] [97 97] [3 7] [103 107] [13 17] [53 57] [23 27] [33 37]},
:port 6701}
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *43*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *45*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *47*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *49*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *51*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.StormServerHandler [DEBUG] Send back response
...
2014-05-12 13:04:18 b.s.d.worker [DEBUG] Doing heartbeat
#backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1399914258,
:storm-id "metrics-topology-1-1399685003", :executors #{[67 67] [69 69] [39
39] [71 71] [41 41] [73 73] [43 43] [75 75] [45 45] [77 77] [47 47] [79 79]
[49 49] [81 81] [51 51] [83 83] [85 85] [87 87] [93 93] [63 63] [95 95] [-1
-1] [1 1] [65 65] [97 97] [3 7] [103 107] [13 17] [53 57] [23 27] [33 37]},
:port 6701}
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *43*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *45*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *47*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *49*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: *51*,
payload size: 11
2014-05-12 13:04:18 b.s.m.n.StormServerHandler [DEBUG] Send back response
...
2014-05-12 13:04:18 b.s.m.n.Server [DEBUG] message received with task: 24,
payload size: 1225
2014-05-12 13:04:18 b.s.m.n.StormServerHandler [DEBUG] Send back response
...
This confirms that worker-2 received tuples from worker-1 but the bolt
tasks are not getting executed.
In a normal system, the worker-2 would have shown additional logs like:
2014-05-12 19:42:33 b.s.m.n.Server [DEBUG] request to be processed:
backtype.storm.messaging.TaskMessage@65c5e8d8
2014-05-12 19:42:33 b.s.m.n.Server [DEBUG] request to be processed:
backtype.storm.messaging.TaskMessage@2439f11b
which is logged when an executed consumes from that queue.
On Mon, May 12, 2014 at 8:29 PM, Srinath C <[email protected]> wrote:
> Hi Padma,
> See inline for my replies....
>
>
> On Mon, May 12, 2014 at 4:39 PM, padma priya chitturi <
> [email protected]> wrote:
>
>> Hi,
>>
>> Few questions on your issue:
>>
>> 1. As soon as you start the topology, is that the bolt execution is not
>> started forever ? or is it like after processing few tuples, bolt execution
>> has stuck. Can you give clear picture on this.
>>
>
> <Srinath> Bolt execution is very normal in the beginning and for a long
> time (about 1-2 days). And then abruptly stops.
>
>
>> 2. You said that the behavior is seen on one of the worker process. So
>> can is that on the other worker process, bolt is executed without any
>> issues ?
>>
>
> <Srinath> Yes, I can see this only on one process. The bolt tasks in other
> process are working fine.
>
>
>> 3. What is the source from which spout reads the input ?
>>
>
> <Srinath> The spout is reading from a kestrel queue.
>
>
>>
>> Also, it would be great if you could provide nimbus/supervisor and worker
>> logs ?
>>
>
> <Srinath> I'm currently troubleshooting this issue and looking into worker
> logs. Will soon share some more details on my findings.
> I don't see much in Nimbus and Supervisor logs. But the worker log details
> seem to reveal some clues. Will update shortly.
>
>
>>
>>
>> On Mon, May 12, 2014 at 6:57 AM, Srinath C <[email protected]> wrote:
>>
>>> Hi,
>>> I'm facing a strange issue running a topology on version
>>> 0.9.1-incubating with Netty as transport.
>>> The topology has two worker processes on the same worker machine.
>>>
>>> To summarize the behavior, on one of the worker processes:
>>> - one of the bolts are not getting executed: The bolt has multiple
>>> executors of the same bolt but none of them are executed
>>> - the spouts in the same worker process are trying to emit tuples
>>> to the bolt but still the bolt is not executed
>>> - after a while the spout itself is not executed (nextTuple is not
>>> called)
>>>
>>> My suspicion is that due to some internal buffers getting filled up
>>> the topology.max.spout.pending limit is hit and storm is no longer invoking
>>> the spout. The topology remains hung like this for a long time and probably
>>> for ever.
>>> From the jstack output, I could figure out that there were 5 threads
>>> lesser in the affected process than a normal process. The thread were
>>> having a stack as below:
>>>
>>> Thread 5986: (state = BLOCKED)
>>> - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame;
>>> information may be imprecise)
>>> - java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object,
>>> long) @bci=20, line=226 (Compiled frame)
>>> -
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long)
>>> @bci=68, line=2082 (Compiled frame)
>>> -
>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take()
>>> @bci=122, line=1090 (Compiled frame)
>>> -
>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take()
>>> @bci=1, line=807 (Interpreted frame)
>>> - java.util.concurrent.ThreadPoolExecutor.getTask() @bci=156, line=1068
>>> (Interpreted frame)
>>> -
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
>>> @bci=26, line=1130 (Interpreted frame)
>>> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
>>> (Interpreted frame)
>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>>>
>>> Has anyone seen such an issue? Any idea if I can confirm my
>>> suspicion of internal buffers getting filled up? What else can I collect
>>> from the processes for troubleshooting?
>>>
>>> Thanks,
>>> Srinath.
>>>
>>>
>>
>
"Thread-35-my-bolt" prio=10 tid=0x00007f3224609800 nid=0x16c1 waiting on
condition [0x00007f31f08d8000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000eb7b4e60> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
at
com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:87)
at
com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:54)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:76)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
at
backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$fn__3978.invoke(executor.clj:745)
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
at clojure.lang.AFn.run(AFn.java:24)
--
"Thread-31-my-bolt" prio=10 tid=0x00007f3224603000 nid=0x16bc waiting on
condition [0x00007f31f0cdc000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000eb7c7960> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
at
com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:87)
at
com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:54)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:76)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
at
backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$fn__3978.invoke(executor.clj:745)
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
at clojure.lang.AFn.run(AFn.java:24)
--
"Thread-27-my-bolt" prio=10 tid=0x00007f32245fa800 nid=0x16b7 runnable
[0x00007f31f10e0000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000eb7d9bc8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
at
com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:87)
at
com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:54)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:76)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
at
backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$fn__3978.invoke(executor.clj:745)
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
at clojure.lang.AFn.run(AFn.java:24)
--
"Thread-23-my-bolt" prio=10 tid=0x00007f32245f4800 nid=0x16b2 waiting on
condition [0x00007f31f14e4000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000eb7ebe30> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
at
com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:87)
at
com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:54)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:76)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
at
backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$fn__3978.invoke(executor.clj:745)
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
at clojure.lang.AFn.run(AFn.java:24)
--
"Thread-19-my-bolt" prio=10 tid=0x00007f32245e8000 nid=0x16ae runnable
[0x00007f31f18e8000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000eb7fe930> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
at
com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:87)
at
com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:54)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:76)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
at
backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$fn__3978.invoke(executor.clj:745)
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
at clojure.lang.AFn.run(AFn.java:24)