[
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002288#comment-14002288
]
ASF GitHub Bot commented on STORM-297:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/103#discussion_r12813477
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
@@ -31,35 +31,69 @@
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
class Server implements IConnection {
private static final Logger LOG =
LoggerFactory.getLogger(Server.class);
@SuppressWarnings("rawtypes")
Map storm_conf;
int port;
- private LinkedBlockingQueue<TaskMessage> message_queue;
+
+ // Create multiple queues for incoming messages. The size equals the
number of receiver threads.
+ // For message which is sent to same task, it will be stored in the
same queue to preserve the message order.
+ private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
+
volatile ChannelGroup allChannels = new
DefaultChannelGroup("storm-server");
final ChannelFactory factory;
final ServerBootstrap bootstrap;
-
+
+ private int queueCount;
+ HashMap<Integer, Integer> taskToQueueId = null;
+ int roundRobinQueueId;
+
+ boolean closing = false;
+ List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1,
null));
+
+
@SuppressWarnings("rawtypes")
Server(Map storm_conf, int port) {
this.storm_conf = storm_conf;
this.port = port;
- message_queue = new LinkedBlockingQueue<TaskMessage>();
-
+
+ queueCount =
Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
+ roundRobinQueueId = 0;
+ taskToQueueId = new HashMap<Integer, Integer>();
+
+ message_queue = new LinkedBlockingQueue[queueCount];
+ for (int i = 0; i < queueCount; i++) {
--- End diff --
Indentation looks off here.
> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
> Key: STORM-297
> URL: https://issues.apache.org/jira/browse/STORM-297
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Labels: Performance, netty
> Fix For: 0.9.2-incubating
>
> Attachments: Storm_performance_fix.pdf,
> storm_Netty_receiver_diagram.png, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size
> is small (around 100 bytes), we can find in the below picture that neither
> the CPU nor the network is saturated. When message size is 100 bytes, only
> 40% of CPU is used, only 18% of network is used, although we have a high
> parallelism (overall we have 144 executors)
--
This message was sent by Atlassian JIRA
(v6.2#6252)