[ 
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002772#comment-14002772
 ] 

ASF GitHub Bot commented on STORM-297:
--------------------------------------

Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/103#discussion_r12827326
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -31,35 +31,69 @@
     import org.slf4j.LoggerFactory;
     
     import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
     import java.util.Map;
     import java.util.concurrent.Executors;
     import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
     
     class Server implements IConnection {
         private static final Logger LOG = 
LoggerFactory.getLogger(Server.class);
         @SuppressWarnings("rawtypes")
         Map storm_conf;
         int port;
    -    private LinkedBlockingQueue<TaskMessage> message_queue;
    +    
    +    // Create multiple queues for incoming messages. The size equals the 
number of receiver threads.
    +    // For message which is sent to same task, it will be stored in the 
same queue to preserve the message order.
    +    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
    +    
         volatile ChannelGroup allChannels = new 
DefaultChannelGroup("storm-server");
         final ChannelFactory factory;
         final ServerBootstrap bootstrap;
    -
    +    
    +    private int queueCount;
    +    HashMap<Integer, Integer> taskToQueueId = null;
    +    int roundRobinQueueId;
    +   
    +    boolean closing = false;
    +    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, 
null));
    +    
    +    
         @SuppressWarnings("rawtypes")
         Server(Map storm_conf, int port) {
             this.storm_conf = storm_conf;
             this.port = port;
    -        message_queue = new LinkedBlockingQueue<TaskMessage>();
    -
    +        
    +        queueCount = 
Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
    +        roundRobinQueueId = 0;
    +        taskToQueueId = new HashMap<Integer, Integer>();
    +    
    +        message_queue = new LinkedBlockingQueue[queueCount];
    +               for (int i = 0; i < queueCount; i++) {
    --- End diff --
    
    fixed


> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
>                 Key: STORM-297
>                 URL: https://issues.apache.org/jira/browse/STORM-297
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>              Labels: Performance, netty
>             Fix For: 0.9.2-incubating
>
>         Attachments: Storm_performance_fix.pdf, 
> storm_Netty_receiver_diagram.png, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing 
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size 
> is small (around 100 bytes), we can find in the below picture that neither 
> the CPU nor the network is saturated. When message size is 100 bytes, only 
> 40% of CPU is used, only 18% of network is used, although we have a high 
> parallelism (overall we have 144 executors)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to