[ 
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14018831#comment-14018831
 ] 

ASF GitHub Bot commented on STORM-297:
--------------------------------------

Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r13443448
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -128,133 +185,114 @@ private long getSleepTimeMs()
         }
     
         /**
    -     * Enqueue a task message to be sent to server
    +     * Enqueue task messages to be sent to server
          */
    -    public void send(int task, byte[] message) {
    -        //throw exception if the client is being closed
    -        if (being_closed.get()) {
    +    synchronized public void send(Iterator<TaskMessage> msgs) {
    +
    +        // throw exception if the client is being closed
    +        if (closing) {
                 throw new RuntimeException("Client is being closed, and does 
not take requests any more");
             }
    -
    -        try {
    -            message_queue.put(new TaskMessage(task, message));
    -
    -            //resume delivery if it is waiting for requests
    -            tryDeliverMessages(true);
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException(e);
    +        
    +        if (null == msgs || !msgs.hasNext()) {
    +            return;
             }
    -    }
    -
    -    /**
    -     * Retrieve messages from queue, and delivery to server if any
    -     */
    -    synchronized void tryDeliverMessages(boolean only_if_waiting) throws 
InterruptedException {
    -        //just skip if delivery only if waiting, and we are not waiting 
currently
    -        if (only_if_waiting && !wait_for_requests)  return;
     
    -        //make sure that channel was not closed
             Channel channel = channelRef.get();
    -        if (channel == null)  return;
    -        if (!channel.isOpen()) {
    -            LOG.info("Channel to {} is no longer open.",remote_addr);
    -            //The channel is not open yet. Reconnect?
    -            reconnect();
    -            return;
    +        if (null == channel) {
    +            connect();
    +            channel = channelRef.get();
             }
     
    -        final MessageBatch requests = tryTakeMessages();
    -        if (requests==null) {
    -            wait_for_requests = true;
    -            return;
    -        }
    +        while (msgs.hasNext()) {
    +            TaskMessage message = msgs.next();
    +            if (null == messageBatch) {
    +                messageBatch = new MessageBatch(messageBatchSize);
    +            }
     
    -        //if channel is being closed and we have no outstanding messages,  
let's close the channel
    -        if (requests.isEmpty() && being_closed.get()) {
    -            close_n_release();
    -            return;
    +            messageBatch.add(message);
    +            if (messageBatch.isFull()) {
    +                MessageBatch toBeFlushed = messageBatch;
    +                flushRequest(channel, toBeFlushed);
    --- End diff --
    
    @Gvain,
    
    For an unacked topology, we can either block, drop, or store. before this 
patch, storm will always store, so there are potential OOM like you described. 
this patch steps a bit forward, if the downstream machine is down, it will 
block the upstream from sending thus avoiding OOM. 
    
    But for the case you mentioned, the downsteam is slow, but not down, it is 
possible that there will be more message pending at the netty client side or 
server side. If the network is OK, then it will pend on the server side, if the 
network is overwhelmed, then the message will be pending at the netty client 
side. 
    
    An example is that the spout crazyly generate really big message, but the 
bandwidth is not enough, it will lead to OOM on the spout worker JVM, and 
nothing can stop that.
    
    Bobby's comment mentioned a similar situation 
https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986, 
    
    We should allow user to config to drop message in this case, thus avoiding 
OOM,  this bug is filed at followup jira 
https://issues.apache.org/jira/browse/STORM-329 after discussion with Bobby.


> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
>                 Key: STORM-297
>                 URL: https://issues.apache.org/jira/browse/STORM-297
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>              Labels: Performance, netty
>             Fix For: 0.9.2-incubating
>
>         Attachments: Storm_performance_fix.pdf, 
> storm_Netty_receiver_diagram.png, storm_conf.txt, 
> storm_performance_fix.patch, worker_throughput_without_storm-297.png
>
>
> We cannot scale up the performance by adding more CPU cores and increasing 
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size 
> is small (around 100 bytes), we can find in the below picture that neither 
> the CPU nor the network is saturated. When message size is 100 bytes, only 
> 40% of CPU is used, only 18% of network is used, although we have a high 
> parallelism (overall we have 144 executors)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to