[ 
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019611#comment-14019611
 ] 

ASF GitHub Bot commented on STORM-297:
--------------------------------------

Github user Gvain commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r13477759
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +185,114 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does 
not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
    -
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws 
InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting 
currently
    -        if (only_if_waiting && !wait_for_requests)  return;
     
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  
let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed);
    --- End diff --
    
    @clockfly 
    
    Thanks for your remind. 
    For an unacked topology. Before your patch, the OOM problem do exists, and 
I filed it at https://issues.apache.org/jira/browse/STORM-339
    
    After your patch, this OOM problem seems still exists just as what you said 
and filed up at jira https://issues.apache.org/jira/browse/STORM-329. Now, we 
are stuck on this as we are using unacked topology under heavy throughput, so 
shall we make this issue's priority higher and fix it in time ?   


> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
>                 Key: STORM-297
>                 URL: https://issues.apache.org/jira/browse/STORM-297
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>              Labels: Performance, netty
>             Fix For: 0.9.2-incubating
>
>         Attachments: Storm_performance_fix.pdf, 
> storm_Netty_receiver_diagram.png, storm_conf.txt, 
> storm_performance_fix.patch, worker_throughput_without_storm-297.png
>
>
> We cannot scale up the performance by adding more CPU cores and increasing 
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size 
> is small (around 100 bytes), we can find in the below picture that neither 
> the CPU nor the network is saturated. When message size is 100 bytes, only 
> 40% of CPU is used, only 18% of network is used, although we have a high 
> parallelism (overall we have 144 executors)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to