[
https://issues.apache.org/jira/browse/STORM-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14019669#comment-14019669
]
ASF GitHub Bot commented on STORM-297:
--------------------------------------
Github user clockfly commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/103#discussion_r13480579
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -128,133 +185,114 @@ private long getSleepTimeMs()
}
/**
- * Enqueue a task message to be sent to server
+ * Enqueue task messages to be sent to server
*/
- public void send(int task, byte[] message) {
- //throw exception if the client is being closed
- if (being_closed.get()) {
+ synchronized public void send(Iterator<TaskMessage> msgs) {
+
+ // throw exception if the client is being closed
+ if (closing) {
throw new RuntimeException("Client is being closed, and does
not take requests any more");
}
-
- try {
- message_queue.put(new TaskMessage(task, message));
-
- //resume delivery if it is waiting for requests
- tryDeliverMessages(true);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ if (null == msgs || !msgs.hasNext()) {
+ return;
}
- }
-
- /**
- * Retrieve messages from queue, and delivery to server if any
- */
- synchronized void tryDeliverMessages(boolean only_if_waiting) throws
InterruptedException {
- //just skip if delivery only if waiting, and we are not waiting
currently
- if (only_if_waiting && !wait_for_requests) return;
- //make sure that channel was not closed
Channel channel = channelRef.get();
- if (channel == null) return;
- if (!channel.isOpen()) {
- LOG.info("Channel to {} is no longer open.",remote_addr);
- //The channel is not open yet. Reconnect?
- reconnect();
- return;
+ if (null == channel) {
+ connect();
+ channel = channelRef.get();
}
- final MessageBatch requests = tryTakeMessages();
- if (requests==null) {
- wait_for_requests = true;
- return;
- }
+ while (msgs.hasNext()) {
+ TaskMessage message = msgs.next();
+ if (null == messageBatch) {
+ messageBatch = new MessageBatch(messageBatchSize);
+ }
- //if channel is being closed and we have no outstanding messages,
let's close the channel
- if (requests.isEmpty() && being_closed.get()) {
- close_n_release();
- return;
+ messageBatch.add(message);
+ if (messageBatch.isFull()) {
+ MessageBatch toBeFlushed = messageBatch;
+ flushRequest(channel, toBeFlushed);
--- End diff --
@Gvain,
Unacked topology with no flow control is dangerous!
There are two concepts, flow control and error control. You don't want
acker for its performance penalty, that is no error control. But you do need
flow control.
But the fact is that current storm flow control method "max.spout.pending"
depends on error control channel "acker", that is the dilemma. Maybe we should
seperate "max.spout.pending" from error control channel "acker" and only serves
for flow control.
One work-around is that when you emit a message with SpoutOutputCollector,
you emit a id with a sampling rate, for example 1%.
collector.emit(tuple, messageId),
only 1% tuple has messageId, while 99% don't have.
In this case, you only have 1% acker traffic, reducing the performance
penalty of acker, while still have some basic flow control.
Hope this helps!
> Storm Performance cannot be scaled up by adding more CPU cores
> --------------------------------------------------------------
>
> Key: STORM-297
> URL: https://issues.apache.org/jira/browse/STORM-297
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Labels: Performance, netty
> Fix For: 0.9.2-incubating
>
> Attachments: Storm_performance_fix.pdf,
> storm_Netty_receiver_diagram.png, storm_conf.txt,
> storm_performance_fix.patch, worker_throughput_without_storm-297.png
>
>
> We cannot scale up the performance by adding more CPU cores and increasing
> parallelism.
> For a 2 layer topology Spout ---shuffle grouping--> bolt, when message size
> is small (around 100 bytes), we can find in the below picture that neither
> the CPU nor the network is saturated. When message size is 100 bytes, only
> 40% of CPU is used, only 18% of network is used, although we have a high
> parallelism (overall we have 144 executors)
--
This message was sent by Atlassian JIRA
(v6.2#6252)