[
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002297#comment-14002297
]
ASF GitHub Bot commented on STORM-297:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/103#discussion_r12813684
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
@@ -72,34 +106,109 @@
Channel channel = bootstrap.bind(new InetSocketAddress(port));
allChannels.add(channel);
}
+
+ private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs)
{
+ ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
+
+ for (int i = 0; i < msgs.size(); i++) {
+ TaskMessage message = msgs.get(i);
+ int task = message.task();
+
+ if (task == -1) {
+ closing = true;
+ return null;
+ }
+
+ Integer queueId = getMessageQueueId(task);
+
+ if (null == messageGroups[queueId]) {
+ messageGroups[queueId] = new ArrayList<TaskMessage>();
+ }
+ messageGroups[queueId].add(message);
+ }
+ return messageGroups;
+ }
+
+ private Integer getMessageQueueId(int task) {
+ // try to construct the map from taskId -> queueId in round robin
manner.
+
+ Integer queueId = taskToQueueId.get(task);
+ if (null == queueId) {
+ synchronized(taskToQueueId) {
+ //assgin task to queue in round-robin manner
+ if (null == taskToQueueId.get(task)) {
+ queueId = roundRobinQueueId++;
+
+ taskToQueueId.put(task, queueId);
+ if (roundRobinQueueId == queueCount) {
+ roundRobinQueueId = 0;
+ }
+ }
+ }
+ }
+ return queueId;
+ }
/**
* enqueue a received message
* @param message
* @throws InterruptedException
*/
- protected void enqueue(TaskMessage message) throws
InterruptedException {
- message_queue.put(message);
- LOG.debug("message received with task: {}, payload size: {}",
message.task(), message.message().length);
- }
+ protected void enqueue(List<TaskMessage> msgs) throws
InterruptedException {
+
+ if (null == msgs || msgs.size() == 0 || closing) {
+ return;
+ }
+
+ ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
+
+ if (null == messageGroups || closing) {
+ return;
+ }
+
+ for (int receiverId = 0; receiverId < messageGroups.length;
receiverId++) {
+ ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
+ if (null != msgGroup) {
+ message_queue[receiverId].put(msgGroup);
+ }
+ }
+ }
/**
* fetch a message from message queue synchronously (flags != 1) or
asynchronously (flags==1)
*/
- public TaskMessage recv(int flags) {
- if ((flags & 0x01) == 0x01) {
+ public Iterator<TaskMessage> recv(int flags) {
--- End diff --
Is there a reason we still have this API here? it seems like we don't
actually want anyone to call this code.
> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
> Key: STORM-297
> URL: https://issues.apache.org/jira/browse/STORM-297
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Labels: Performance, netty
> Fix For: 0.9.2-incubating
>
> Attachments: Storm_performance_fix.pdf,
> storm_Netty_receiver_diagram.png, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size
> is small (around 100 bytes), we can find in the below picture that neither
> the CPU nor the network is saturated. When message size is 100 bytes, only
> 40% of CPU is used, only 18% of network is used, although we have a high
> parallelism (overall we have 144 executors)
--
This message was sent by Atlassian JIRA
(v6.2#6252)