[
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13991589#comment-13991589
]
ASF GitHub Bot commented on STORM-297:
--------------------------------------
Github user clockfly commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/103#discussion_r12362613
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -128,133 +181,103 @@ private long getSleepTimeMs()
}
/**
- * Enqueue a task message to be sent to server
+ * Enqueue task messages to be sent to server
*/
- public void send(int task, byte[] message) {
- //throw exception if the client is being closed
- if (being_closed.get()) {
+ synchronized public void send(Iterator<TaskMessage> msgs) {
+
+ // throw exception if the client is being closed
+ if (closing) {
throw new RuntimeException("Client is being closed, and does
not take requests any more");
}
-
- try {
- message_queue.put(new TaskMessage(task, message));
-
- //resume delivery if it is waiting for requests
- tryDeliverMessages(true);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ if (null == msgs || !msgs.hasNext()) {
+ return;
}
- }
- /**
- * Retrieve messages from queue, and delivery to server if any
- */
- synchronized void tryDeliverMessages(boolean only_if_waiting) throws
InterruptedException {
- //just skip if delivery only if waiting, and we are not waiting
currently
- if (only_if_waiting && !wait_for_requests) return;
-
- //make sure that channel was not closed
Channel channel = channelRef.get();
- if (channel == null) return;
- if (!channel.isOpen()) {
- LOG.info("Channel to {} is no longer open.",remote_addr);
- //The channel is not open yet. Reconnect?
- reconnect();
- return;
+ if (null == channel) {
+ connect();
+ channel = channelRef.get();
}
- final MessageBatch requests = tryTakeMessages();
- if (requests==null) {
- wait_for_requests = true;
- return;
- }
+ while (msgs.hasNext()) {
+ TaskMessage message = msgs.next();
+ if (null == messageBatch) {
+ messageBatch = new MessageBatch(messageBatchSize);
+ }
- //if channel is being closed and we have no outstanding messages,
let's close the channel
- if (requests.isEmpty() && being_closed.get()) {
- close_n_release();
- return;
+ messageBatch.add(message);
+ if (messageBatch.isFull()) {
+ MessageBatch toBeFlushed = messageBatch;
+ flushRequest(channel, toBeFlushed, blocking);
+ messageBatch = null;
+ }
}
- //we are busily delivering messages, and will check queue upon
response.
- //When send() is called by senders, we should not thus call
tryDeliverMessages().
- wait_for_requests = false;
-
- //write request into socket channel
- ChannelFuture future = channel.write(requests);
- future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future)
- throws Exception {
- if (!future.isSuccess()) {
- LOG.info("failed to send "+requests.size()+" requests
to "+remote_addr, future.getCause());
- reconnect();
- } else {
- LOG.debug("{} request(s) sent", requests.size());
-
- //Now that our requests have been sent, channel could
be closed if needed
- if (being_closed.get())
- close_n_release();
- }
+ if (null != messageBatch && !messageBatch.isEmpty()) {
+ if (channel.isWritable()) {
--- End diff --
when channel is NOT writable, it means the internal netty buffer is full.
In this case, we can try to buffer up more incoming messages.
> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
> Key: STORM-297
> URL: https://issues.apache.org/jira/browse/STORM-297
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Labels: Performance, netty
> Fix For: 0.9.2-incubating
>
> Attachments: Storm_performance_fix.pdf, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size
> is small (around 100 bytes), we can find in the below picture that neither
> the CPU nor the network is saturated. When message size is 100 bytes, only
> 40% of CPU is used, only 18% of network is used, although we have a high
> parallelism (overall we have 144 executors)
--
This message was sent by Atlassian JIRA
(v6.2#6252)