Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-64870378
  
    I test this on our cluster;
    ###### Before add this patch 
    ```
    2014-11-28 15:03:56 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-lg-hadoop-tst-st03.bj/10.2.201.68:49967... [48]
    2014-11-28 15:04:00 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-A/xxx.xxx.xxx.xxx:49967... [49]
    2014-11-28 15:04:04 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-A/xxx.xxx.xxx.xxx:49967... [50]
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Closing Netty Client 
Netty-Client-A/xxx.xxx.xxx.xxx:49967
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Waiting for pending batchs to be 
sent with Netty-Client-A/xxx.xxx.xxx.xxx:49967..., timeout: 600000ms, pendings: 
0 
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does 
not take requests any more, drop the messages...
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does 
not take requests any more, drop the messages...
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does 
not take requests any more, drop the messages...
    2014-11-28 15:04:08 o.a.c.r.ExponentialBackoffRetry [WARN] maxRetries too 
large (50). Pinning to 29
    2014-11-28 15:04:08 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The 
baseSleepTimeMs [100] the maxSleepTimeMs [4000] the ma
          xRetries [50]
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] New Netty Client, connect to B, 
46389, config: , buffer_size: 524
          2880
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-B/xxx.xxx.xxx.xxx:46389... [0]    
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] connection established to a 
remote host  Netty-Client-B/xxx.xxx.xxx.xxx:46389, [id: 0x0aa5eefe, 
/xxx.xxx.xxx.xxx:59716 =>  Netty-Client-B/xxx.xxx.xxx.xxx:46389]
    
    ```
    the log describe of the sequence of events:
    1. Worker send message to A but A had been died;
    2. Worker start connect to worker A until exceed max retry times;
    3. Meanwhile this worker send message to worker A, but send and connect are 
synchronized, send will wait until reconnect finish;
    4. Meanwhile refresh-connections in worker.clj is running, but it won't 
call worker A.close() until send finish, because it require 
endpoint-socket-lock first:
    ```
         ->>   (write-locked (:endpoint-socket-lock worker)
                    (reset! (:cached-task->node+port worker)
                            (HashMap. my-assignment)))
                  (doseq [endpoint remove-connections]
                    (.close (get @(:cached-node+port->socket worker) endpoint)))
    ```
    but right now send hold the endpoint-socket-lock:
    ```
      (disruptor/clojure-handler
          (fn [packets _ batch-end?]
            (.add drainer packets)
            
            (when batch-end?
       ->>  (read-locked endpoint-socket-lock
                (let [node+port->socket @node+port->socket]
                  (.send drainer node+port->socket)))
              (.clear drainer))))))
    ```
    5. After reconnect failed, it call close() and change Client status to 
Closed; 
    6. Send called, but Client status is Closed, so we drop the send message.
    7. After send finished,  refresh-connections was called, it first close 
worker A(closed before, so no log print), and connect to new worker B;
    
    
    ###### After add this patch
    ```
    2014-11-28 14:22:33 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-A/xxx.xxx.xxx.xxx:45909... [0]
    2014-11-28 14:22:33 b.s.m.n.Client [ERROR] The Connection channel currently 
is not available, dropping pending 4 messages...
    2014-11-28 14:22:33 b.s.m.n.Client [ERROR] The Connection channel currently 
is not available, dropping pending 10 messages...
    ```
    While reconnect to worker A, send message to worker A failed:
    ```
          // msgs iterator is invalid after this call, we cannot use it further
          int msgCount = iteratorSize(msgs);
    
          // the connection is down, drop pending messages
          LOG.error("The Connection channel currently is not available, 
dropping pending " + msgCount + " messages...");
    ``` 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to