[
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019611#comment-14019611
]
ASF GitHub Bot commented on STORM-297:
--------------------------------------
Github user Gvain commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/103#discussion_r13477759
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -128,133 +185,114 @@ private long getSleepTimeMs()
}
/**
- * Enqueue a task message to be sent to server
+ * Enqueue task messages to be sent to server
*/
- public void send(int task, byte[] message) {
- //throw exception if the client is being closed
- if (being_closed.get()) {
+ synchronized public void send(Iterator<TaskMessage> msgs) {
+
+ // throw exception if the client is being closed
+ if (closing) {
throw new RuntimeException("Client is being closed, and does
not take requests any more");
}
-
- try {
- message_queue.put(new TaskMessage(task, message));
-
- //resume delivery if it is waiting for requests
- tryDeliverMessages(true);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ if (null == msgs || !msgs.hasNext()) {
+ return;
}
- }
-
- /**
- * Retrieve messages from queue, and delivery to server if any
- */
- synchronized void tryDeliverMessages(boolean only_if_waiting) throws
InterruptedException {
- //just skip if delivery only if waiting, and we are not waiting
currently
- if (only_if_waiting && !wait_for_requests) return;
- //make sure that channel was not closed
Channel channel = channelRef.get();
- if (channel == null) return;
- if (!channel.isOpen()) {
- LOG.info("Channel to {} is no longer open.",remote_addr);
- //The channel is not open yet. Reconnect?
- reconnect();
- return;
+ if (null == channel) {
+ connect();
+ channel = channelRef.get();
}
- final MessageBatch requests = tryTakeMessages();
- if (requests==null) {
- wait_for_requests = true;
- return;
- }
+ while (msgs.hasNext()) {
+ TaskMessage message = msgs.next();
+ if (null == messageBatch) {
+ messageBatch = new MessageBatch(messageBatchSize);
+ }
- //if channel is being closed and we have no outstanding messages,
let's close the channel
- if (requests.isEmpty() && being_closed.get()) {
- close_n_release();
- return;
+ messageBatch.add(message);
+ if (messageBatch.isFull()) {
+ MessageBatch toBeFlushed = messageBatch;
+ flushRequest(channel, toBeFlushed);
--- End diff --
@clockfly
Thanks for your remind.
For an unacked topology. Before your patch, the OOM problem do exists, and
I filed it at https://issues.apache.org/jira/browse/STORM-339
After your patch, this OOM problem seems still exists just as what you said
and filed up at jira https://issues.apache.org/jira/browse/STORM-329. Now, we
are stuck on this as we are using unacked topology under heavy throughput, so
shall we make this issue's priority higher and fix it in time ?
> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
> Key: STORM-297
> URL: https://issues.apache.org/jira/browse/STORM-297
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Labels: Performance, netty
> Fix For: 0.9.2-incubating
>
> Attachments: Storm_performance_fix.pdf,
> storm_Netty_receiver_diagram.png, storm_conf.txt,
> storm_performance_fix.patch, worker_throughput_without_storm-297.png
>
>
> We cannot scale up the performance by adding more CPU cores and increasing
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size
> is small (around 100 bytes), we can find in the below picture that neither
> the CPU nor the network is saturated. When message size is 100 bytes, only
> 40% of CPU is used, only 18% of network is used, although we have a high
> parallelism (overall we have 144 executors)
--
This message was sent by Atlassian JIRA
(v6.2#6252)