[
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14313870#comment-14313870
]
ASF GitHub Bot commented on STORM-329:
--------------------------------------
Github user miguno commented on the pull request:
https://github.com/apache/storm/pull/268#issuecomment-73667341
Many thanks for your review, Sean. I addressed your comments, see the new
commits in
https://github.com/miguno/storm/commits/0.10.0-SNAPSHOT-STORM-392-miguno-merge.
One final question: How should we address the following TODO in
`send(Iterator<TaskMessage> msgs)`? I'd appreciate additional eyeballs on
this. :-) It's a scenario that we may or may not have overlooked in the
original code. Note how we are NOT checking for a `WRITABLE` channel while
flushing (full) message batches (cf. the `while` loop), but we do check for a
`WRITABLE` channel when handling any left-over messages (cf. after the `while`
loop).
```java
/**
* Enqueue task messages to be sent to the remote destination (cf.
`host` and `port`).
*/
@Override
public synchronized void send(Iterator<TaskMessage> msgs) {
// ...some code removed to shorten this code snippet...
Channel channel = channelRef.get();
if (!connectionEstablished(channel)) {
// Closing the channel and reconnecting should be done before
handling the messages.
closeChannelAndReconnect(channel);
handleMessagesWhenConnectionIsUnavailable(msgs);
return;
}
// Collect messages into batches (to optimize network throughput),
then flush them.
while (msgs.hasNext()) {
TaskMessage message = msgs.next();
if (messageBatch == null) {
messageBatch = new MessageBatch(messageBatchSize);
}
messageBatch.add(message);
// TODO: What shall we do if the channel is not writable?
if (messageBatch.isFull()) {
MessageBatch toBeFlushed = messageBatch;
flushMessages(channel, toBeFlushed);
messageBatch = null;
}
}
// Handle any remaining messages in case the "last" batch was not
full.
if (containsMessages(messageBatch)) {
if (connectionEstablished(channel) && channel.isWritable()) {
// We can write to the channel, so we flush the remaining
messages immediately to minimize latency.
pauseBackgroundFlushing();
MessageBatch toBeFlushed = messageBatch;
messageBatch = null;
flushMessages(channel, toBeFlushed);
}
else {
// We cannot write to the channel, which means Netty's
internal write buffer is full.
// In this case, we buffer the remaining messages and wait
for the next messages to arrive.
//
// Background:
// Netty 3.x maintains an internal write buffer with a high
water mark for each channel (default: 64K).
// This represents the amount of data waiting to be flushed
to operating system buffers. If the
// outstanding data exceeds this value then the channel is
set to non-writable. When this happens, a
// INTEREST_CHANGED channel event is triggered. Netty sets
the channel to writable again once the data
// has been flushed to the system buffers.
//
// See http://stackoverflow.com/questions/14049260
resumeBackgroundFlushing();
nextBackgroundFlushTimeMs.set(nowMillis() +
flushCheckIntervalMs);
}
}
```
> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
> Key: STORM-329
> URL: https://issues.apache.org/jira/browse/STORM-329
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.9.2-incubating
> Reporter: Sean Zhong
> Priority: Minor
> Labels: Netty
> Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986]
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are
> blocking. My biggest concern around the blocking is in the case of a worker
> crashing. If a single worker crashes this can block the entire topology from
> executing until that worker comes back up. In some cases I can see that being
> something that you would want. In other cases I can see speed being the
> primary concern and some users would like to get partial data fast, rather
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max
> limit to the buffering that is allowed, before we block, or throw data away
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the
> messages, and use the built-in storm failover mechanism?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)