[ 
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315781#comment-14315781
 ] 

ASF GitHub Bot commented on STORM-329:
--------------------------------------

Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73847544
  
    I think if the channel is not `WRITABLE` when we are trying to flush a full 
message batch (or more than one, given that it's part of the `while` loop), 
then we'll drop those messages until the channel becomes `WRITABLE` again.  If 
acking is enabled, replaying may kick in if the spout/source supports it;  in 
all other cases the messages will be lost forever.
    
    ```scala
            // Collect messages into batches (to optimize network throughput), 
then flush them.
            while (msgs.hasNext()) {
                TaskMessage message = msgs.next();
                if (messageBatch == null) {
                    messageBatch = new MessageBatch(messageBatchSize);
                }
    
                messageBatch.add(message);
                // TODO: What shall we do if the channel is not writable?
                if (messageBatch.isFull()) {
                    MessageBatch toBeFlushed = messageBatch;
                    flushMessages(channel, toBeFlushed);
                    messageBatch = null;
                }
            }
    ```
    The code after the `while` loop will keep any remaining messages -- up to a 
full batch size -- in memory via the `messageBatch` field in case the channel 
is not `WRITABLE`:
    
    ```scala
            // Handle any remaining messages in case the "last" batch was not 
full.
            if (containsMessages(messageBatch)) {
                if (connectionEstablished(channel) && channel.isWritable()) {
                  // ...snip...
                else { // <<< if not WRITABLE, then we keep the remaining 
messages in memory and try background flushing
                    resumeBackgroundFlushing();
                    nextBackgroundFlushTimeMs.set(nowMillis() + 
flushCheckIntervalMs);
                }
            }
    ```
    
    To illustrate the current behavior (note: this behavior is the same before 
and after the changes in this pull request -- the PR does not modify this part 
of the logic) take the following example.
    
    * Message batch size: `10`
    * `send()` receives a list of `35` messages.
    * The channel becomes unwritable at the time we are processing message 13.
    
    In this case the code would try to process and split/batch the messages as 
follows:
    
    ```
    10 --> 10 --> 10 --> 5
    \             /      |
     \           /       |
      handled in      handled in if-else after while
      while loop
    ```
    
    Here, the first batch of `10` messages would be successfully written to the 
channel because the latter is still `WRITABLE`.  The channel would (given our 
assumption) then become unwritable while we're batching up messages 11-20, and 
subsequently we would not be able to write messages 11-20 and messages 21-30 to 
the channel.  Messages 31-35 would be kept in memory (via the `messageBatch` 
field), and we'd begin background flushing to write those 5 messages.
    
    So in this scenario:
    - Messages 1-10 would be written successfully.
    - Messages 11-20 and 21-30 would be dropped.  Acking configuration and 
spout/source behavior would determine whether this dropping would cause message 
loss or replaying.
    - Messages 31-35 would be buffered in-memory (i.e. not Netty's internal 
write buffer), and background flushing may or may not be able to successfully 
write the messages eventually.
    
    Is that a correct summary?


> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
>                 Key: STORM-329
>                 URL: https://issues.apache.org/jira/browse/STORM-329
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.9.2-incubating
>            Reporter: Sean Zhong
>            Priority: Minor
>              Labels: Netty
>         Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought 
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986] 
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are 
> blocking. My biggest concern around the blocking is in the case of a worker 
> crashing. If a single worker crashes this can block the entire topology from 
> executing until that worker comes back up. In some cases I can see that being 
> something that you would want. In other cases I can see speed being the 
> primary concern and some users would like to get partial data fast, rather 
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max 
> limit to the buffering that is allowed, before we block, or throw data away 
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed 
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the 
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the 
> messages, and use the built-in storm failover mechanism? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to