Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73667341
  
    Many thanks for your review, Sean.  I addressed your comments, see the new 
commits in 
https://github.com/miguno/storm/commits/0.10.0-SNAPSHOT-STORM-392-miguno-merge.
    
    One final question:  How should we address the following TODO in 
`send(Iterator<TaskMessage> msgs)`?  I'd appreciate additional eyeballs on 
this. :-)  It's a scenario that we may or may not have overlooked in the 
original code.  Note how we are NOT checking for a `WRITABLE` channel while 
flushing (full) message batches (cf. the `while` loop), but we do check for a 
`WRITABLE` channel when handling any left-over messages (cf. after the `while` 
loop).
    
    ```java
        /**
         * Enqueue task messages to be sent to the remote destination (cf. 
`host` and `port`).
         */
        @Override
        public synchronized void send(Iterator<TaskMessage> msgs) {
    
            // ...some code removed to shorten this code snippet...
    
            Channel channel = channelRef.get();
            if (!connectionEstablished(channel)) {
                // Closing the channel and reconnecting should be done before 
handling the messages.
                closeChannelAndReconnect(channel);
                handleMessagesWhenConnectionIsUnavailable(msgs);
                return;
            }
    
            // Collect messages into batches (to optimize network throughput), 
then flush them.
            while (msgs.hasNext()) {
                TaskMessage message = msgs.next();
                if (messageBatch == null) {
                    messageBatch = new MessageBatch(messageBatchSize);
                }
    
                messageBatch.add(message);
                // TODO: What shall we do if the channel is not writable?
                if (messageBatch.isFull()) {
                    MessageBatch toBeFlushed = messageBatch;
                    flushMessages(channel, toBeFlushed);
                    messageBatch = null;
                }
            }
    
            // Handle any remaining messages in case the "last" batch was not 
full.
            if (containsMessages(messageBatch)) {
                if (connectionEstablished(channel) && channel.isWritable()) {
                    // We can write to the channel, so we flush the remaining 
messages immediately to minimize latency.
                    pauseBackgroundFlushing();
                    MessageBatch toBeFlushed = messageBatch;
                    messageBatch = null;
                    flushMessages(channel, toBeFlushed);
                }
                else {
                    // We cannot write to the channel, which means Netty's 
internal write buffer is full.
                    // In this case, we buffer the remaining messages and wait 
for the next messages to arrive.
                    //
                    // Background:
                    // Netty 3.x maintains an internal write buffer with a high 
water mark for each channel (default: 64K).
                    // This represents the amount of data waiting to be flushed 
to operating system buffers.  If the
                    // outstanding data exceeds this value then the channel is 
set to non-writable.  When this happens, a
                    // INTEREST_CHANGED channel event is triggered.  Netty sets 
the channel to writable again once the data
                    // has been flushed to the system buffers.
                    //
                    // See http://stackoverflow.com/questions/14049260
                    resumeBackgroundFlushing();
                    nextBackgroundFlushTimeMs.set(nowMillis() + 
flushCheckIntervalMs);
                }
            }
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to