Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/103#discussion_r12813684
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
@@ -72,34 +106,109 @@
Channel channel = bootstrap.bind(new InetSocketAddress(port));
allChannels.add(channel);
}
+
+ private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs)
{
+ ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
+
+ for (int i = 0; i < msgs.size(); i++) {
+ TaskMessage message = msgs.get(i);
+ int task = message.task();
+
+ if (task == -1) {
+ closing = true;
+ return null;
+ }
+
+ Integer queueId = getMessageQueueId(task);
+
+ if (null == messageGroups[queueId]) {
+ messageGroups[queueId] = new ArrayList<TaskMessage>();
+ }
+ messageGroups[queueId].add(message);
+ }
+ return messageGroups;
+ }
+
+ private Integer getMessageQueueId(int task) {
+ // try to construct the map from taskId -> queueId in round robin
manner.
+
+ Integer queueId = taskToQueueId.get(task);
+ if (null == queueId) {
+ synchronized(taskToQueueId) {
+ //assgin task to queue in round-robin manner
+ if (null == taskToQueueId.get(task)) {
+ queueId = roundRobinQueueId++;
+
+ taskToQueueId.put(task, queueId);
+ if (roundRobinQueueId == queueCount) {
+ roundRobinQueueId = 0;
+ }
+ }
+ }
+ }
+ return queueId;
+ }
/**
* enqueue a received message
* @param message
* @throws InterruptedException
*/
- protected void enqueue(TaskMessage message) throws
InterruptedException {
- message_queue.put(message);
- LOG.debug("message received with task: {}, payload size: {}",
message.task(), message.message().length);
- }
+ protected void enqueue(List<TaskMessage> msgs) throws
InterruptedException {
+
+ if (null == msgs || msgs.size() == 0 || closing) {
+ return;
+ }
+
+ ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
+
+ if (null == messageGroups || closing) {
+ return;
+ }
+
+ for (int receiverId = 0; receiverId < messageGroups.length;
receiverId++) {
+ ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
+ if (null != msgGroup) {
+ message_queue[receiverId].put(msgGroup);
+ }
+ }
+ }
/**
* fetch a message from message queue synchronously (flags != 1) or
asynchronously (flags==1)
*/
- public TaskMessage recv(int flags) {
- if ((flags & 0x01) == 0x01) {
+ public Iterator<TaskMessage> recv(int flags) {
--- End diff --
Is there a reason we still have this API here? it seems like we don't
actually want anyone to call this code.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---