Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43588816
  
    Hi Bobby,
    
    Thank you for your comments. I addressed most of the issues, except these 
three:
    
    **issue1:**
    > 
    ```
    +    private synchronized void connect() {
    ```
    > I don't like the idea of having connect block until the connection is 
established. Any thread that tries to send data to a connection that is still 
being established will block until the connection is established. I think it is 
more robust to buffer the messages in a data structure and try to handle them 
later when the connection is finished.
    
    **reply:** I think it is better to have connection estabilish before we 
allow message sender to send message. Here is the my obervations when allowing 
message sender to send message without a established connection:
    
    1. Excessive memory usage
    If  message senders are allow to send message without the connetion being 
established, the senders are encourged to send as fast as they can. In some 
profiling, I observed there was sharp increase of heap memory in the beginning, 
as we will buffer all those message in a unlimited queue in netty client. If 
the user are using unacked topology or set the topology.max.spout.pending to be 
a bigger enough value, it will possibly cause the JVM OOM.
    
    2. Longer latency
    The Netty Client queue can turns out to be very long, longer queue means 
longer latency. For example, suppose Netty Client can at max transfer 10 
tuples/second, the Netty Client queue increased to size of 10000 because of 
this buffering, the spout generates 10 tuple/second, then in this case the 
queue size will stablize at 10000, the throughput will be 10 tuples/second. The 
throughput will be the same no matter the queue size is 100, or 1000, or 10000, 
the latency will be much bigger for queue size 10000. So it is very important 
to make sure the queue will not increase to too big from the begining.
    
    3. Reduced throughput.
    When the latency is longer, it will reduce the message generation speed of 
spout, as spout  will wait message to be acked(unacked size controled by 
topology.max.spout.pending). The longer the initial latency is, the longer it 
takes for the spout to converge to the balanced speed of tuple generation.  
      
    In the code, we will setup the connection in Client constructor asyncly 
before the send(message) is called to reduce the time that need to be waited by 
the message sender.
    ```
            Thread flushChecker = new Thread(new Runnable() {
                @Override
                public void run() {
                    //make sure we have a connection
                    connect();  //<-----------------here!
    ```
    
    **issue 2:**
    >
    ```
    storm-core/src/jvm/backtype/storm/messaging/netty/Client
    +        Thread flushChecker = new Thread(new Runnable() {
    ```
    > Can we make this thread shared between the clients, otherwise we will 
have a dedicated thread per client, which can cause resource utilization 
issues, hitting a ulimit with the number of processes allowed per user.
    
    **Reply:** Can we do this in a followup patch? I have a local patch, but it 
requires more testing.
    
    **issue 3:** 
    >
    ```
    +            (let [node+port (get @task->node+port task)]
    +              (when (not (.get remoteMap node+port))
    +                (.put remoteMap node+port (ArrayList.)))
    +              (let [remote (.get remoteMap node+port)]
    +                (.add remote (TaskMessage. task (.serialize serializer 
tuple)))
    +                 ))))
    ```
    > The above code does not really feel like it is clojure, as it is updating 
mutable state. I would rather have see us do something like a group-by.
    
    **Reply:** This is on purpose for performance. 
    The ArrayList and HashMap constructed here will be used directly in java. 
We designed a customized iterator inside class TransferDrainer, so that we 
don't need to wrap/unwrap data or copy data from clojure data strucutre to java 
data structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to