Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73847544
  
    I think if the channel is not `WRITABLE` when we are trying to flush a full 
message batch (or more than one, given that it's part of the `while` loop), 
then we'll drop those messages until the channel becomes `WRITABLE` again.  If 
acking is enabled, replaying may kick in if the spout/source supports it;  in 
all other cases the messages will be lost forever.
    
    ```scala
            // Collect messages into batches (to optimize network throughput), 
then flush them.
            while (msgs.hasNext()) {
                TaskMessage message = msgs.next();
                if (messageBatch == null) {
                    messageBatch = new MessageBatch(messageBatchSize);
                }
    
                messageBatch.add(message);
                // TODO: What shall we do if the channel is not writable?
                if (messageBatch.isFull()) {
                    MessageBatch toBeFlushed = messageBatch;
                    flushMessages(channel, toBeFlushed);
                    messageBatch = null;
                }
            }
    ```
    The code after the `while` loop will keep any remaining messages -- up to a 
full batch size -- in memory via the `messageBatch` field in case the channel 
is not `WRITABLE`:
    
    ```scala
            // Handle any remaining messages in case the "last" batch was not 
full.
            if (containsMessages(messageBatch)) {
                if (connectionEstablished(channel) && channel.isWritable()) {
                  // ...snip...
                else { // <<< if not WRITABLE, then we keep the remaining 
messages in memory and try background flushing
                    resumeBackgroundFlushing();
                    nextBackgroundFlushTimeMs.set(nowMillis() + 
flushCheckIntervalMs);
                }
            }
    ```
    
    To illustrate the current behavior (note: this behavior is the same before 
and after the changes in this pull request -- the PR does not modify this part 
of the logic) take the following example.
    
    * Message batch size: `10`
    * `send()` receives a list of `35` messages.
    * The channel becomes unwritable at the time we are processing message 13.
    
    In this case the code would try to process and split/batch the messages as 
follows:
    
    ```
    10 --> 10 --> 10 --> 5
    \             /      |
     \           /       |
      handled in      handled in if-else after while
      while loop
    ```
    
    Here, the first batch of `10` messages would be successfully written to the 
channel because the latter is still `WRITABLE`.  The channel would (given our 
assumption) then become unwritable while we're batching up messages 11-20, and 
subsequently we would not be able to write messages 11-20 and messages 21-30 to 
the channel.  Messages 31-35 would be kept in memory (via the `messageBatch` 
field), and we'd begin background flushing to write those 5 messages.
    
    So in this scenario:
    - Messages 1-10 would be written successfully.
    - Messages 11-20 and 21-30 would be dropped.  Acking configuration and 
spout/source behavior would determine whether this dropping would cause message 
loss or replaying.
    - Messages 31-35 would be buffered in-memory (i.e. not Netty's internal 
write buffer), and background flushing may or may not be able to successfully 
write the messages eventually.
    
    Is that a correct summary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to