[
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315781#comment-14315781
]
ASF GitHub Bot commented on STORM-329:
--------------------------------------
Github user miguno commented on the pull request:
https://github.com/apache/storm/pull/268#issuecomment-73847544
I think if the channel is not `WRITABLE` when we are trying to flush a full
message batch (or more than one, given that it's part of the `while` loop),
then we'll drop those messages until the channel becomes `WRITABLE` again. If
acking is enabled, replaying may kick in if the spout/source supports it; in
all other cases the messages will be lost forever.
```scala
// Collect messages into batches (to optimize network throughput),
then flush them.
while (msgs.hasNext()) {
TaskMessage message = msgs.next();
if (messageBatch == null) {
messageBatch = new MessageBatch(messageBatchSize);
}
messageBatch.add(message);
// TODO: What shall we do if the channel is not writable?
if (messageBatch.isFull()) {
MessageBatch toBeFlushed = messageBatch;
flushMessages(channel, toBeFlushed);
messageBatch = null;
}
}
```
The code after the `while` loop will keep any remaining messages -- up to a
full batch size -- in memory via the `messageBatch` field in case the channel
is not `WRITABLE`:
```scala
// Handle any remaining messages in case the "last" batch was not
full.
if (containsMessages(messageBatch)) {
if (connectionEstablished(channel) && channel.isWritable()) {
// ...snip...
else { // <<< if not WRITABLE, then we keep the remaining
messages in memory and try background flushing
resumeBackgroundFlushing();
nextBackgroundFlushTimeMs.set(nowMillis() +
flushCheckIntervalMs);
}
}
```
To illustrate the current behavior (note: this behavior is the same before
and after the changes in this pull request -- the PR does not modify this part
of the logic) take the following example.
* Message batch size: `10`
* `send()` receives a list of `35` messages.
* The channel becomes unwritable at the time we are processing message 13.
In this case the code would try to process and split/batch the messages as
follows:
```
10 --> 10 --> 10 --> 5
\ / |
\ / |
handled in handled in if-else after while
while loop
```
Here, the first batch of `10` messages would be successfully written to the
channel because the latter is still `WRITABLE`. The channel would (given our
assumption) then become unwritable while we're batching up messages 11-20, and
subsequently we would not be able to write messages 11-20 and messages 21-30 to
the channel. Messages 31-35 would be kept in memory (via the `messageBatch`
field), and we'd begin background flushing to write those 5 messages.
So in this scenario:
- Messages 1-10 would be written successfully.
- Messages 11-20 and 21-30 would be dropped. Acking configuration and
spout/source behavior would determine whether this dropping would cause message
loss or replaying.
- Messages 31-35 would be buffered in-memory (i.e. not Netty's internal
write buffer), and background flushing may or may not be able to successfully
write the messages eventually.
Is that a correct summary?
> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
> Key: STORM-329
> URL: https://issues.apache.org/jira/browse/STORM-329
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.9.2-incubating
> Reporter: Sean Zhong
> Priority: Minor
> Labels: Netty
> Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986]
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are
> blocking. My biggest concern around the blocking is in the case of a worker
> crashing. If a single worker crashes this can block the entire topology from
> executing until that worker comes back up. In some cases I can see that being
> something that you would want. In other cases I can see speed being the
> primary concern and some users would like to get partial data fast, rather
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max
> limit to the buffering that is allowed, before we block, or throw data away
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the
> messages, and use the built-in storm failover mechanism?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)