[ 
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14313870#comment-14313870
 ] 

ASF GitHub Bot commented on STORM-329:
--------------------------------------

Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73667341
  
    Many thanks for your review, Sean.  I addressed your comments, see the new 
commits in 
https://github.com/miguno/storm/commits/0.10.0-SNAPSHOT-STORM-392-miguno-merge.
    
    One final question:  How should we address the following TODO in 
`send(Iterator<TaskMessage> msgs)`?  I'd appreciate additional eyeballs on 
this. :-)  It's a scenario that we may or may not have overlooked in the 
original code.  Note how we are NOT checking for a `WRITABLE` channel while 
flushing (full) message batches (cf. the `while` loop), but we do check for a 
`WRITABLE` channel when handling any left-over messages (cf. after the `while` 
loop).
    
    ```java
        /**
         * Enqueue task messages to be sent to the remote destination (cf. 
`host` and `port`).
         */
        @Override
        public synchronized void send(Iterator<TaskMessage> msgs) {
    
            // ...some code removed to shorten this code snippet...
    
            Channel channel = channelRef.get();
            if (!connectionEstablished(channel)) {
                // Closing the channel and reconnecting should be done before 
handling the messages.
                closeChannelAndReconnect(channel);
                handleMessagesWhenConnectionIsUnavailable(msgs);
                return;
            }
    
            // Collect messages into batches (to optimize network throughput), 
then flush them.
            while (msgs.hasNext()) {
                TaskMessage message = msgs.next();
                if (messageBatch == null) {
                    messageBatch = new MessageBatch(messageBatchSize);
                }
    
                messageBatch.add(message);
                // TODO: What shall we do if the channel is not writable?
                if (messageBatch.isFull()) {
                    MessageBatch toBeFlushed = messageBatch;
                    flushMessages(channel, toBeFlushed);
                    messageBatch = null;
                }
            }
    
            // Handle any remaining messages in case the "last" batch was not 
full.
            if (containsMessages(messageBatch)) {
                if (connectionEstablished(channel) && channel.isWritable()) {
                    // We can write to the channel, so we flush the remaining 
messages immediately to minimize latency.
                    pauseBackgroundFlushing();
                    MessageBatch toBeFlushed = messageBatch;
                    messageBatch = null;
                    flushMessages(channel, toBeFlushed);
                }
                else {
                    // We cannot write to the channel, which means Netty's 
internal write buffer is full.
                    // In this case, we buffer the remaining messages and wait 
for the next messages to arrive.
                    //
                    // Background:
                    // Netty 3.x maintains an internal write buffer with a high 
water mark for each channel (default: 64K).
                    // This represents the amount of data waiting to be flushed 
to operating system buffers.  If the
                    // outstanding data exceeds this value then the channel is 
set to non-writable.  When this happens, a
                    // INTEREST_CHANGED channel event is triggered.  Netty sets 
the channel to writable again once the data
                    // has been flushed to the system buffers.
                    //
                    // See http://stackoverflow.com/questions/14049260
                    resumeBackgroundFlushing();
                    nextBackgroundFlushTimeMs.set(nowMillis() + 
flushCheckIntervalMs);
                }
            }
    ```
    



> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
>                 Key: STORM-329
>                 URL: https://issues.apache.org/jira/browse/STORM-329
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.9.2-incubating
>            Reporter: Sean Zhong
>            Priority: Minor
>              Labels: Netty
>         Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought 
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986] 
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are 
> blocking. My biggest concern around the blocking is in the case of a worker 
> crashing. If a single worker crashes this can block the entire topology from 
> executing until that worker comes back up. In some cases I can see that being 
> something that you would want. In other cases I can see speed being the 
> primary concern and some users would like to get partial data fast, rather 
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max 
> limit to the buffering that is allowed, before we block, or throw data away 
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed 
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the 
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the 
> messages, and use the built-in storm failover mechanism? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to