[ 
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13991589#comment-13991589
 ] 

ASF GitHub Bot commented on STORM-297:
--------------------------------------

Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12362613
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +181,103 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does 
not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
     
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws 
InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting 
currently
    -        if (only_if_waiting && !wait_for_requests)  return;
    -
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  
let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                messageBatch = null;
    +            }
             }
     
    -        //we are busily delivering messages, and will check queue upon 
response.
    -        //When send() is called by senders, we should not thus call 
tryDeliverMessages().
    -        wait_for_requests = false;
    -
    -        //write request into socket channel
    -        ChannelFuture future = channel.write(requests);
    -        future.addListener(new ChannelFutureListener() {
    -            public void operationComplete(ChannelFuture future)
    -                    throws Exception {
    -                if (!future.isSuccess()) {
    -                    LOG.info("failed to send "+requests.size()+" requests 
to "+remote_addr, future.getCause());
    -                    reconnect();
    -                } else {
    -                    LOG.debug("{} request(s) sent", requests.size());
    -
    -                    //Now that our requests have been sent, channel could 
be closed if needed
    -                    if (being_closed.get())
    -                        close_n_release();
    -                }
    +        if (null != messageBatch && !messageBatch.isEmpty()) {
    +            if (channel.isWritable()) {
    --- End diff --
    
    when channel is NOT writable, it means the internal netty buffer is full. 
In this case, we can try to buffer up more incoming messages.


> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
>                 Key: STORM-297
>                 URL: https://issues.apache.org/jira/browse/STORM-297
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>              Labels: Performance, netty
>             Fix For: 0.9.2-incubating
>
>         Attachments: Storm_performance_fix.pdf, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing 
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size 
> is small (around 100 bytes), we can find in the below picture that neither 
> the CPU nor the network is saturated. When message size is 100 bytes, only 
> 40% of CPU is used, only 18% of network is used, although we have a high 
> parallelism (overall we have 144 executors)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to