[ 
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002273#comment-14002273
 ] 

ASF GitHub Bot commented on STORM-297:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12813204
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +181,105 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does 
not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
     
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws 
InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting 
currently
    -        if (only_if_waiting && !wait_for_requests)  return;
    -
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  
let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                messageBatch = null;
    +            }
             }
     
    -        //we are busily delivering messages, and will check queue upon 
response.
    -        //When send() is called by senders, we should not thus call 
tryDeliverMessages().
    -        wait_for_requests = false;
    -
    -        //write request into socket channel
    -        ChannelFuture future = channel.write(requests);
    -        future.addListener(new ChannelFutureListener() {
    -            public void operationComplete(ChannelFuture future)
    -                    throws Exception {
    -                if (!future.isSuccess()) {
    -                    LOG.info("failed to send "+requests.size()+" requests 
to "+remote_addr, future.getCause());
    -                    reconnect();
    -                } else {
    -                    LOG.debug("{} request(s) sent", requests.size());
    -
    -                    //Now that our requests have been sent, channel could 
be closed if needed
    -                    if (being_closed.get())
    -                        close_n_release();
    -                }
    +        if (null != messageBatch && !messageBatch.isEmpty()) {
    +            if (channel.isWritable()) {
    +                flushCheckTimer.set(Long.MAX_VALUE);
    +                
    +                // Flush as fast as we can to reduce the latency
    +                MessageBatch toBeFlushed = messageBatch;
    +                messageBatch = null;
    +                flushRequest(channel, toBeFlushed, blocking);
    +                
    +            } else {
    +                // when channel is NOT writable, it means the internal 
netty buffer is full. 
    +                // In this case, we can try to buffer up more incoming 
messages.
    +                flushCheckTimer.set(System.currentTimeMillis() + 
flushCheckInterval);
                 }
    -        });
    +        }
    +
         }
     
    -    /**
    -     * Take all enqueued messages from queue
    -     * @return  batch of messages
    -     * @throws InterruptedException
    -     *
    -     * synchronized ... ensure that messages are delivered in the same 
order
    -     * as they are added into queue
    -     */
    -    private MessageBatch tryTakeMessages() throws InterruptedException {
    -        //1st message
    -        Object msg = message_queue.poll();
    -        if (msg == null) return null;
    -
    -        MessageBatch batch = new MessageBatch(buffer_size);
    -        //we will discard any message after CLOSE
    -        if (msg == ControlMessage.CLOSE_MESSAGE) {
    -            LOG.info("Connection to {} is being closed", remote_addr);
    -            being_closed.set(true);
    -            return batch;
    +    public String name() {
    +        if (null != remote_addr) {
    +            return PREFIX + remote_addr.toString();
             }
    +        return "";
    +    }
     
    -        batch.add((TaskMessage)msg);
    -        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
    -            //Is it a CLOSE message?
    -            if (msg == ControlMessage.CLOSE_MESSAGE) {
    -                message_queue.take();
    -                LOG.info("Connection to {} is being closed", remote_addr);
    -                being_closed.set(true);
    -                break;
    +    private synchronized void flush() {
    +        if (!closing) {
    +            if (null != messageBatch && !messageBatch.isEmpty()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                Channel channel = channelRef.get();
    +                if (channel != null) {
    +                    flushCheckTimer.set(Long.MAX_VALUE);
    +                    flushRequest(channel, toBeFlushed, true);
    +                }
    +                messageBatch = null;
                 }
    -
    -            //try to add this msg into batch
    -            if (!batch.tryAdd((TaskMessage) msg))
    -                break;
    -
    -            //remove this message
    -            message_queue.take();
             }
    -
    -        return batch;
         }
    -
    +    
         /**
          * gracefully close this client.
    -     *
    -     * We will send all existing requests, and then invoke 
close_n_release() method
    +     * 
    +     * We will send all existing requests, and then invoke 
close_n_release()
    +     * method
          */
    -    public void close() {
    -        //enqueue a CLOSE message so that shutdown() will be invoked
    -        try {
    -            message_queue.put(ControlMessage.CLOSE_MESSAGE);
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            LOG.info("Interrupted Connection to {} is being closed", 
remote_addr);
    -            being_closed.set(true);
    +    public synchronized void close() {
    +        if (!closing) {
    +            closing = true;
    +            if (null != messageBatch && !messageBatch.isEmpty()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                Channel channel = channelRef.get();
    +                if (channel != null) {
    +                    flushRequest(channel, toBeFlushed, true);
    +                }
    +                messageBatch = null;
    +            }
    +        
    +            //wait for pendings to exit
    +            while(pendings.get() != 0) {
    --- End diff --
    
    Can we have a max number of iterations on this? I just feel it is more 
defensive to try for a while, and then if it does not work out log the message 
and go on.  After all the worst thing that happens with leaving early is that 
some messages may not have been acked, which we more or less ignore anyways.


> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
>                 Key: STORM-297
>                 URL: https://issues.apache.org/jira/browse/STORM-297
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>              Labels: Performance, netty
>             Fix For: 0.9.2-incubating
>
>         Attachments: Storm_performance_fix.pdf, 
> storm_Netty_receiver_diagram.png, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing 
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size 
> is small (around 100 bytes), we can find in the below picture that neither 
> the CPU nor the network is saturated. When message size is 100 bytes, only 
> 40% of CPU is used, only 18% of network is used, although we have a high 
> parallelism (overall we have 144 executors)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to