[ 
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002773#comment-14002773
 ] 

ASF GitHub Bot commented on STORM-297:
--------------------------------------

Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827328
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -72,34 +106,109 @@
             Channel channel = bootstrap.bind(new InetSocketAddress(port));
             allChannels.add(channel);
         }
    +    
    +    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) 
{
    +      ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
    +      
    +      for (int i = 0; i < msgs.size(); i++) {
    +        TaskMessage message = msgs.get(i);
    +        int task = message.task();
    +        
    +        if (task == -1) {
    +          closing = true;
    +          return null;
    +        }
    +        
    +        Integer queueId = getMessageQueueId(task);
    +        
    +        if (null == messageGroups[queueId]) {
    +          messageGroups[queueId] = new ArrayList<TaskMessage>();
    +        }
    +        messageGroups[queueId].add(message);
    +      }
    +      return messageGroups;
    +    }
    +    
    +    private Integer getMessageQueueId(int task) {
    +      // try to construct the map from taskId -> queueId in round robin 
manner.
    +      
    +      Integer queueId = taskToQueueId.get(task);
    +      if (null == queueId) {
    +        synchronized(taskToQueueId) {
    +          //assgin task to queue in round-robin manner
    +          if (null == taskToQueueId.get(task)) {
    +            queueId = roundRobinQueueId++;
    +            
    +            taskToQueueId.put(task, queueId);
    +            if (roundRobinQueueId == queueCount) {
    +              roundRobinQueueId = 0;
    +            }
    +          }
    +        }
    +      }
    +      return queueId;
    +    }
     
         /**
          * enqueue a received message 
          * @param message
          * @throws InterruptedException
          */
    -    protected void enqueue(TaskMessage message) throws 
InterruptedException {
    -        message_queue.put(message);
    -        LOG.debug("message received with task: {}, payload size: {}", 
message.task(), message.message().length);
    -    }
    +    protected void enqueue(List<TaskMessage> msgs) throws 
InterruptedException {
    +      
    +      if (null == msgs || msgs.size() == 0 || closing) {
    +        return;
    +      }
    +      
    +      ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
    +      
    +      if (null == messageGroups || closing) {
    +        return;
    +      }
    +      
    +      for (int receiverId = 0; receiverId < messageGroups.length; 
receiverId++) {
    +        ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
    +        if (null != msgGroup) {
    +          message_queue[receiverId].put(msgGroup);
    +        }
    +      }
    +   }
         
         /**
          * fetch a message from message queue synchronously (flags != 1) or 
asynchronously (flags==1)
          */
    -    public TaskMessage recv(int flags)  {
    -        if ((flags & 0x01) == 0x01) { 
    +    public Iterator<TaskMessage> recv(int flags)  {
    --- End diff --
    
    fixed


> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
>                 Key: STORM-297
>                 URL: https://issues.apache.org/jira/browse/STORM-297
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>              Labels: Performance, netty
>             Fix For: 0.9.2-incubating
>
>         Attachments: Storm_performance_fix.pdf, 
> storm_Netty_receiver_diagram.png, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing 
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size 
> is small (around 100 bytes), we can find in the below picture that neither 
> the CPU nor the network is saturated. When message size is 100 bytes, only 
> 40% of CPU is used, only 18% of network is used, although we have a high 
> parallelism (overall we have 144 executors)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to