[ 
https://issues.apache.org/jira/browse/STORM-794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14526316#comment-14526316
 ] 

Jungtaek Lim edited comment on STORM-794 at 5/4/15 6:40 AM:
------------------------------------------------------------

I found the issue.

I modified executor.clj to add logs representing size of overflow-buffer, 
pending, max-spout-pending, and active flag.

{code}
(let [active? @(:storm-active-atom executor-data)
                curr-count (.get emitted-count)]
            (do
              (log-message "overflow-buffer's size " (.size overflow-buffer) " 
and pending size " (.size pending)
                " and max-spout-pending " max-spout-pending " and active " 
active?)

              (if (and (.isEmpty overflow-buffer)
                   (or (not max-spout-pending)
                       (< (.size pending) max-spout-pending)))
              (if active?
                (do
                  (when-not @last-active
                    (reset! last-active true)
                    (log-message "Activating spout " component-id ":" (keys 
task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.activate spout)))

                  (fast-list-iter [^ISpout spout spouts]
                    (do
                      (log-message "pending size " (.size pending) " and 
max-spout-pending " max-spout-pending
                        " and active " active?)
                      (.nextTuple spout))))
                (do
                  (when @last-active
                    (reset! last-active false)
                    (log-message "Deactivating spout " component-id ":" (keys 
task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
                  ;; TODO: log that it's getting throttled
                  (Time/sleep 100))))
              )
            (if (and (= curr-count (.get emitted-count)) active?)
              (do (.increment empty-emit-streak)
                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak)))
              (.set empty-emit-streak 0)
              )) 
{code}

And below is logs. Please take a look at tx 41696:0.

{noformat}
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][executor] Acking 
message 41696:0
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $commit [41696:0]
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 __ack_init [-6284616655193060224 0 1]
2015-05-04 14:02:37.199 INFO  [Thread-16-$mastercoord-bg0][executor] 
overflow-buffer's size 0 and pending size 10 and max-spout-pending 10 and 
active true
2015-05-04 14:02:37.199 INFO  [Thread-4-__acker][executor] Processing received 
message source: $mastercoord-bg0:1, stream: __ack_init, id: {}, 
[-6284616655193060224 0 1]
2015-05-04 14:02:37.199 INFO  [Thread-4-__acker][task] Emitting direct: 1; 
__acker __ack_ack [-6284616655193060224]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][executor] Processing 
received message source: __acker:3, stream: __ack_ack, id: {}, 
[-6284616655193060224]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][executor] Acking 
message 41696:0
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $success [41696:0]
2015-05-04 14:02:37.200 INFO  [Thread-2-$spoutcoord-spout0][executor] 
Processing received message source: $mastercoord-bg0:1, stream: $success, id: 
{}, [41696:0]
2015-05-04 14:02:37.203 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $batch [41706:0]
2015-05-04 14:02:37.204 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 __ack_init [8327784611027372914 600406483975750023 1]
2015-05-04 14:02:37.204 INFO  [Thread-2-$spoutcoord-spout0][executor] 
Processing received message source: $mastercoord-bg0:1, stream: $batch, id: 
{8327784611027372914=600406483975750023}, [41706:0]
2015-05-04 14:02:37.204 INFO  [Thread-4-__acker][executor] Processing received 
message source: $mastercoord-bg0:1, stream: __ack_init, id: {}, 
[8327784611027372914 600406483975750023 1]
2015-05-04 14:02:37.204 INFO  [Thread-16-$mastercoord-bg0][executor] 
overflow-buffer's size 0 and pending size 10 and max-spout-pending 10 and 
active true
{noformat}

When MasterBatchCoordinator.ack() has called and its attempt state is 
COMMITTING, MasterBatchCoordinator treats current tx to be completed, AND it 
starts new transaction immediately by increasing current tx id and calling 
sync() directly.
You can find that mastercoord-bg0 emits 41696:0 to $success and starts 41706 
(note that max spout pending is 10). Async loop comes too late.

So, though executor can know that active is false, above if-statement could be 
always false so executor is never deactivated.

ps. My spout can sleep more than 1 sec before emitting from emitBatch(), but I 
don't think it shouldn't be issue. 
Btw, complete latency of batch is about 20 secs. At this time Spout doesn't 
emit anything from emitBatch(). 
{noformat}
2015-05-04 14:02:17.173 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $batch [41696:0]
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $commit [41696:0]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $success [41696:0]
{noformat}


was (Author: kabhwan):
I found the issue.

I modified executor.clj to add logs representing size of overflow-buffer, 
pending, max-spout-pending, and active flag.

{code}
(let [active? @(:storm-active-atom executor-data)
                curr-count (.get emitted-count)]
            (do
              (log-message "overflow-buffer's size " (.size overflow-buffer) " 
and pending size " (.size pending)
                " and max-spout-pending " max-spout-pending " and active " 
active?)

              (if (and (.isEmpty overflow-buffer)
                   (or (not max-spout-pending)
                       (< (.size pending) max-spout-pending)))
              (if active?
                (do
                  (when-not @last-active
                    (reset! last-active true)
                    (log-message "Activating spout " component-id ":" (keys 
task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.activate spout)))

                  (fast-list-iter [^ISpout spout spouts]
                    (do
                      (log-message "pending size " (.size pending) " and 
max-spout-pending " max-spout-pending
                        " and active " active?)
                      (.nextTuple spout))))
                (do
                  (when @last-active
                    (reset! last-active false)
                    (log-message "Deactivating spout " component-id ":" (keys 
task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
                  ;; TODO: log that it's getting throttled
                  (Time/sleep 100))))
              )
            (if (and (= curr-count (.get emitted-count)) active?)
              (do (.increment empty-emit-streak)
                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak)))
              (.set empty-emit-streak 0)
              )) 
{code}

And below is logs. Please take a look at tx 41696:0.

{noformat}
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][executor] Acking 
message 41696:0
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $commit [41696:0]
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 __ack_init [-6284616655193060224 0 1]
2015-05-04 14:02:37.199 INFO  [Thread-16-$mastercoord-bg0][executor] 
overflow-buffer's size 0 and pending size 10 and max-spout-pending 10 and 
active true
2015-05-04 14:02:37.199 INFO  [Thread-4-__acker][executor] Processing received 
message source: $mastercoord-bg0:1, stream: __ack_init, id: {}, 
[-6284616655193060224 0 1]
2015-05-04 14:02:37.199 INFO  [Thread-4-__acker][task] Emitting direct: 1; 
__acker __ack_ack [-6284616655193060224]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][executor] Processing 
received message source: __acker:3, stream: __ack_ack, id: {}, 
[-6284616655193060224]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][executor] Acking 
message 41696:0
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $success [41696:0]
2015-05-04 14:02:37.200 INFO  [Thread-2-$spoutcoord-spout0][executor] 
Processing received message source: $mastercoord-bg0:1, stream: $success, id: 
{}, [41696:0]
2015-05-04 14:02:37.203 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $batch [41706:0]
2015-05-04 14:02:37.204 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 __ack_init [8327784611027372914 600406483975750023 1]
2015-05-04 14:02:37.204 INFO  [Thread-2-$spoutcoord-spout0][executor] 
Processing received message source: $mastercoord-bg0:1, stream: $batch, id: 
{8327784611027372914=600406483975750023}, [41706:0]
2015-05-04 14:02:37.204 INFO  [Thread-4-__acker][executor] Processing received 
message source: $mastercoord-bg0:1, stream: __ack_init, id: {}, 
[8327784611027372914 600406483975750023 1]
2015-05-04 14:02:37.204 INFO  [Thread-16-$mastercoord-bg0][executor] 
overflow-buffer's size 0 and pending size 10 and max-spout-pending 10 and 
active true
{noformat}

When MasterBatchCoordinator.ack() has called and its attempt state is 
COMMITTING, MasterBatchCoordinator treats current tx to be completed, AND it 
starts new transaction immediately by increases current tx id and calls sync() 
directly.
You can find that mastercoord-bg0 emits 41696:0 to $success and starts 41706 
(note that max spout pending is 10). Async loop comes too late.

So, though executor can know that active is false, above if-statement could be 
always false so executor is never deactivated.

ps. My spout can sleep more than 1 sec before emitting from emitBatch(), but I 
don't think it shouldn't be issue. 
Btw, complete latency of batch is about 20 secs. At this time Spout doesn't 
emit anything from emitBatch(). 
{noformat}
2015-05-04 14:02:17.173 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $batch [41696:0]
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $commit [41696:0]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: 
$mastercoord-bg0 $success [41696:0]
{noformat}

> Trident Topology with some situation seems not handle deactivate during 
> graceful shutdown
> -----------------------------------------------------------------------------------------
>
>                 Key: STORM-794
>                 URL: https://issues.apache.org/jira/browse/STORM-794
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 0.9.3
>            Reporter: Jungtaek Lim
>            Assignee: Jungtaek Lim
>
> I met an issue from Trident Topology in production env.
> Normally, when we kill a topology via UI, Nimbus changes Topology status to 
> "killed", and when Spout determines new status, it becomes deactivated so 
> bolts can handle remain tuples within wait-time.
> AFAIK that's how Storm guarantees graceful shutdown.
> But, Trident Topology seems not handle "deactivate" while we try shutdown 
> topology gracefully.
> MasterBatchCoordinator never stops making next transaction, so Trident Spout 
> never stops emitting, bolts (function) always take care of tuples.
> Topology setting
> - 1 worker, 1 acker
> - max spout pending: 1
> - TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS : 5
> -- It may be weird but MasterBatchCoordinator's default value is 1
> * Nimbus log
> {code}
> 2015-04-20 09:59:07.954 INFO  [pool-5-thread-41][nimbus] Delaying event 
> :remove for 120 secs for BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015
> ...
> 2015-04-20 09:59:07.955 INFO  [pool-5-thread-41][nimbus] Updated 
> BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015 with status {:type 
> :killed, :kill-time-secs 120}
> ...
> 2015-04-20 10:01:07.956 INFO  [timer][nimbus] Killing topology: 
> BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015
> ...
> 2015-04-20 10:01:14.448 INFO  [timer][nimbus] Cleaning up 
> BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015
> {code}
> * Supervisor log
> {code}
> 2015-04-20 10:01:07.960 INFO  [Thread-1][supervisor] Removing code for storm 
> id BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015
> 2015-04-20 10:01:07.962 INFO  [Thread-2][supervisor] Shutting down and 
> clearing state for id 9719259e-528c-4336-abf9-592c1bb9a00b. Current 
> supervisor time: 1429491667. State: :disallowed, Heartbeat: 
> #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1429491667, 
> :storm-id "BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015", :executors 
> #{[2 2] [3 3] [4 4] [5 5] [6 6] [7 7] [8 8] [9 9] [10 10] [11 11] [12 12] [13 
> 13] [14 14] [-1 -1] [1 1]}, :port 6706}
> 2015-04-20 10:01:07.962 INFO  [Thread-2][supervisor] Shutting down 
> 5bc084a2-b668-4610-86f6-9b93304d40a8:9719259e-528c-4336-abf9-592c1bb9a00b
> 2015-04-20 10:01:08.974 INFO  [Thread-2][supervisor] Shut down 
> 5bc084a2-b668-4610-86f6-9b93304d40a8:9719259e-528c-4336-abf9-592c1bb9a00b
> {code}
> * Worker log
> {code}
> 2015-04-20 10:01:07.985 INFO  [Thread-33][worker] Shutting down worker 
> BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015 
> 5bc084a2-b668-4610-86f6-9b93304d40a8 6706
> 2015-04-20 10:01:07.985 INFO  [Thread-33][worker] Shutting down receive thread
> 2015-04-20 10:01:07.988 WARN  [Thread-33][ExponentialBackoffRetry] maxRetries 
> too large (300). Pinning to 29
> 2015-04-20 10:01:07.988 INFO  
> [Thread-33][StormBoundedExponentialBackoffRetry] The baseSleepTimeMs [100] 
> the maxSleepTimeMs [1000] the maxRetries [300]
> 2015-04-20 10:01:07.988 INFO  [Thread-33][Client] New Netty Client, connect 
> to localhost, 6706, config: , buffer_size: 5242880
> 2015-04-20 10:01:07.991 INFO  [client-schedule-service-1][Client] Reconnect 
> started for Netty-Client-localhost/127.0.0.1:6706... [0]
> 2015-04-20 10:01:07.996 INFO  [Thread-33][loader] Shutting down 
> receiving-thread: [BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015, 6706]
> ...
> 2015-04-20 10:01:08.044 INFO  [Thread-33][Client] Closing Netty Client 
> Netty-Client-localhost/127.0.0.1:6706
> 2015-04-20 10:01:08.044 INFO  [Thread-33][Client] Waiting for pending batchs 
> to be sent with Netty-Client-localhost/127.0.0.1:6706..., timeout: 600000ms, 
> pendings: 1
> {code}
> I found activating log, but cannot find deactivating log.
> {code}
> 2015-04-20 09:50:24.556 INFO  [Thread-30-$mastercoord-bg0][executor] 
> Activating spout $mastercoord-bg0:(1)
> {code}
> Please note that it doesn't work when I just push button to "deactivate" 
> topology via UI.
> We're changing our Topology to normal Spout-Bolt, but personally I'd like to 
> see it resolved. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to