[
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002264#comment-14002264
]
ASF GitHub Bot commented on STORM-297:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/103#discussion_r12812919
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -128,133 +181,105 @@ private long getSleepTimeMs()
}
/**
- * Enqueue a task message to be sent to server
+ * Enqueue task messages to be sent to server
*/
- public void send(int task, byte[] message) {
- //throw exception if the client is being closed
- if (being_closed.get()) {
+ synchronized public void send(Iterator<TaskMessage> msgs) {
+
+ // throw exception if the client is being closed
+ if (closing) {
throw new RuntimeException("Client is being closed, and does
not take requests any more");
}
-
- try {
- message_queue.put(new TaskMessage(task, message));
-
- //resume delivery if it is waiting for requests
- tryDeliverMessages(true);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ if (null == msgs || !msgs.hasNext()) {
+ return;
}
- }
- /**
- * Retrieve messages from queue, and delivery to server if any
- */
- synchronized void tryDeliverMessages(boolean only_if_waiting) throws
InterruptedException {
- //just skip if delivery only if waiting, and we are not waiting
currently
- if (only_if_waiting && !wait_for_requests) return;
-
- //make sure that channel was not closed
Channel channel = channelRef.get();
- if (channel == null) return;
- if (!channel.isOpen()) {
- LOG.info("Channel to {} is no longer open.",remote_addr);
- //The channel is not open yet. Reconnect?
- reconnect();
- return;
+ if (null == channel) {
+ connect();
+ channel = channelRef.get();
}
- final MessageBatch requests = tryTakeMessages();
- if (requests==null) {
- wait_for_requests = true;
- return;
- }
+ while (msgs.hasNext()) {
+ TaskMessage message = msgs.next();
+ if (null == messageBatch) {
+ messageBatch = new MessageBatch(messageBatchSize);
+ }
- //if channel is being closed and we have no outstanding messages,
let's close the channel
- if (requests.isEmpty() && being_closed.get()) {
- close_n_release();
- return;
+ messageBatch.add(message);
+ if (messageBatch.isFull()) {
+ MessageBatch toBeFlushed = messageBatch;
+ flushRequest(channel, toBeFlushed, blocking);
+ messageBatch = null;
+ }
}
- //we are busily delivering messages, and will check queue upon
response.
- //When send() is called by senders, we should not thus call
tryDeliverMessages().
- wait_for_requests = false;
-
- //write request into socket channel
- ChannelFuture future = channel.write(requests);
- future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future)
- throws Exception {
- if (!future.isSuccess()) {
- LOG.info("failed to send "+requests.size()+" requests
to "+remote_addr, future.getCause());
- reconnect();
- } else {
- LOG.debug("{} request(s) sent", requests.size());
-
- //Now that our requests have been sent, channel could
be closed if needed
- if (being_closed.get())
- close_n_release();
- }
+ if (null != messageBatch && !messageBatch.isEmpty()) {
+ if (channel.isWritable()) {
+ flushCheckTimer.set(Long.MAX_VALUE);
+
+ // Flush as fast as we can to reduce the latency
+ MessageBatch toBeFlushed = messageBatch;
+ messageBatch = null;
+ flushRequest(channel, toBeFlushed, blocking);
+
+ } else {
+ // when channel is NOT writable, it means the internal
netty buffer is full.
+ // In this case, we can try to buffer up more incoming
messages.
+ flushCheckTimer.set(System.currentTimeMillis() +
flushCheckInterval);
}
- });
+ }
+
}
- /**
- * Take all enqueued messages from queue
- * @return batch of messages
- * @throws InterruptedException
- *
- * synchronized ... ensure that messages are delivered in the same
order
- * as they are added into queue
- */
- private MessageBatch tryTakeMessages() throws InterruptedException {
- //1st message
- Object msg = message_queue.poll();
- if (msg == null) return null;
-
- MessageBatch batch = new MessageBatch(buffer_size);
- //we will discard any message after CLOSE
- if (msg == ControlMessage.CLOSE_MESSAGE) {
- LOG.info("Connection to {} is being closed", remote_addr);
- being_closed.set(true);
- return batch;
+ public String name() {
+ if (null != remote_addr) {
+ return PREFIX + remote_addr.toString();
}
+ return "";
+ }
- batch.add((TaskMessage)msg);
- while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
- //Is it a CLOSE message?
- if (msg == ControlMessage.CLOSE_MESSAGE) {
- message_queue.take();
- LOG.info("Connection to {} is being closed", remote_addr);
- being_closed.set(true);
- break;
+ private synchronized void flush() {
+ if (!closing) {
+ if (null != messageBatch && !messageBatch.isEmpty()) {
+ MessageBatch toBeFlushed = messageBatch;
+ Channel channel = channelRef.get();
+ if (channel != null) {
+ flushCheckTimer.set(Long.MAX_VALUE);
+ flushRequest(channel, toBeFlushed, true);
+ }
+ messageBatch = null;
--- End diff --
If Channel is null do we really want to drop the messageBatch?
> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
> Key: STORM-297
> URL: https://issues.apache.org/jira/browse/STORM-297
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Labels: Performance, netty
> Fix For: 0.9.2-incubating
>
> Attachments: Storm_performance_fix.pdf,
> storm_Netty_receiver_diagram.png, storm_performance_fix.patch
>
>
> We cannot scale up the performance by adding more CPU cores and increasing
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size
> is small (around 100 bytes), we can find in the below picture that neither
> the CPU nor the network is saturated. When message size is 100 bytes, only
> 40% of CPU is used, only 18% of network is used, although we have a high
> parallelism (overall we have 144 executors)
--
This message was sent by Atlassian JIRA
(v6.2#6252)