[ 
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002829#comment-14002829
 ] 

ASF GitHub Bot commented on STORM-297:
--------------------------------------

Github user clockfly commented on the pull request:

    https://github.com/apache/incubator-storm/pull/103#issuecomment-43588816
  
    Hi Bobby,
    
    Thank you for your comments. I addressed most of the issues, except these 
three:
    
    **issue1:**
    > 
    ```
    +    private synchronized void connect() {
    ```
    > I don't like the idea of having connect block until the connection is 
established. Any thread that tries to send data to a connection that is still 
being established will block until the connection is established. I think it is 
more robust to buffer the messages in a data structure and try to handle them 
later when the connection is finished.
    
    **reply:** I think it is better to have connection estabilish before we 
allow message sender to send message. Here is the my obervations when allowing 
message sender to send message without a established connection:
    
    1. Excessive memory usage
    If  message senders are allow to send message without the connetion being 
established, the senders are encourged to send as fast as they can. In some 
profiling, I observed there was sharp increase of heap memory in the beginning, 
as we will buffer all those message in a unlimited queue in netty client. If 
the user are using unacked topology or set the topology.max.spout.pending to be 
a bigger enough value, it will possibly cause the JVM OOM.
    
    2. Longer latency
    The Netty Client queue can turns out to be very long, longer queue means 
longer latency. For example, suppose Netty Client can at max transfer 10 
tuples/second, the Netty Client queue increased to size of 10000 because of 
this buffering, the spout generates 10 tuple/second, then in this case the 
queue size will stablize at 10000, the throughput will be 10 tuples/second. The 
throughput will be the same no matter the queue size is 100, or 1000, or 10000, 
the latency will be much bigger for queue size 10000. So it is very important 
to make sure the queue will not increase to too big from the begining.
    
    3. Reduced throughput.
    When the latency is longer, it will reduce the message generation speed of 
spout, as spout  will wait message to be acked(unacked size controled by 
topology.max.spout.pending). The longer the initial latency is, the longer it 
takes for the spout to converge to the balanced speed of tuple generation.  
      
    In the code, we will setup the connection in Client constructor asyncly 
before the send(message) is called to reduce the time that need to be waited by 
the message sender.
    ```
            Thread flushChecker = new Thread(new Runnable() {
                @Override
                public void run() {
                    //make sure we have a connection
                    connect();  //<-----------------here!
    ```
    
    **issue 2:**
    >
    ```
    storm-core/src/jvm/backtype/storm/messaging/netty/Client
    +        Thread flushChecker = new Thread(new Runnable() {
    ```
    > Can we make this thread shared between the clients, otherwise we will 
have a dedicated thread per client, which can cause resource utilization 
issues, hitting a ulimit with the number of processes allowed per user.
    
    **Reply:** Can we do this in a followup patch? I have a local patch, but it 
requires more testing.
    
    **issue 3:** 
    >
    ```
    +            (let [node+port (get @task->node+port task)]
    +              (when (not (.get remoteMap node+port))
    +                (.put remoteMap node+port (ArrayList.)))
    +              (let [remote (.get remoteMap node+port)]
    +                (.add remote (TaskMessage. task (.serialize serializer 
tuple)))
    +                 ))))
    ```
    > The above code does not really feel like it is clojure, as it is updating 
mutable state. I would rather have see us do something like a group-by.
    
    **Reply:** This is on purpose for performance. 
    The ArrayList and HashMap constructed here will be used directly in java. 
We designed a customized iterator inside class TransferDrainer, so that we 
don't need to wrap/unwrap data or copy data from clojure data strucutre to java 
data structure.


> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
>                 Key: STORM-297
>                 URL: https://issues.apache.org/jira/browse/STORM-297
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>              Labels: Performance, netty
>             Fix For: 0.9.2-incubating
>
>         Attachments: Storm_performance_fix.pdf, 
> storm_Netty_receiver_diagram.png, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing 
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size 
> is small (around 100 bytes), we can find in the below picture that neither 
> the CPU nor the network is saturated. When message size is 100 bytes, only 
> 40% of CPU is used, only 18% of network is used, although we have a high 
> parallelism (overall we have 144 executors)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to