[ 
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14322547#comment-14322547
 ] 

ASF GitHub Bot commented on STORM-329:
--------------------------------------

Github user miguno commented on a diff in the pull request:

    https://github.com/apache/storm/pull/429#discussion_r24738502
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -42,344 +42,577 @@
     import org.slf4j.LoggerFactory;
     
     import backtype.storm.Config;
    +import backtype.storm.messaging.ConnectionWithStatus;
     import backtype.storm.metric.api.IStatefulObject;
    -import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
     import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
     import backtype.storm.utils.Utils;
     
    -public class Client implements IConnection, IStatefulObject{
    +/**
    + * A Netty client for sending task messages to a remote destination (Netty 
server).
    + *
    + * Implementation details:
    + *
    + * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
    + * - Messages are sent in batches to optimize for network throughput at 
the expense of network latency.  The message
    + *   batch size is configurable.
    + * - Connecting and reconnecting are performed asynchronously.
    + *     - Note: The current implementation drops any messages that are 
being enqueued for sending if the connection to
    + *       the remote destination is currently unavailable.
    + * - A background flusher thread is run in the background.  It will, at 
fixed intervals, check for any pending messages
    + *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
    + *   enabled.
    + */
    +public class Client extends ConnectionWithStatus implements 
IStatefulObject {
    +
         private static final Logger LOG = 
LoggerFactory.getLogger(Client.class);
         private static final String PREFIX = "Netty-Client-";
    -    private final int max_retries;
    -    private final int base_sleep_ms;
    -    private final int max_sleep_ms;
    +    private static final long NO_DELAY_MS = 0L;
    +    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
    +    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
    +    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
    +    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
    +
         private final StormBoundedExponentialBackoffRetry retryPolicy;
    -    private AtomicReference<Channel> channelRef;
         private final ClientBootstrap bootstrap;
    -    private InetSocketAddress remote_addr;
    -    
    -    private AtomicInteger totalReconnects;
    -    private AtomicInteger messagesSent;
    -    private AtomicInteger messagesLostReconnect;
    -    private final Random random = new Random();
    -    private final ChannelFactory factory;
    -    private final int buffer_size;
    -    private boolean closing;
    -
    -    private int messageBatchSize;
    -    
    -    private AtomicLong pendings;
    -    
    -    Map storm_conf;
    +    private final InetSocketAddress dstAddress;
    +    protected final String dstAddressPrefixedName;
    +
    +    /**
    +     * The channel used for all write operations from this client to the 
remote destination.
    +     */
    +    private final AtomicReference<Channel> channelRef = new 
AtomicReference<Channel>(null);
    +
    +
    +    /**
    +     * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
    +     */
    +    private final int maxReconnectionAttempts;
    +
    +    /**
    +     * Total number of connection attempts.
    +     */
    +    private final AtomicInteger totalConnectionAttempts = new 
AtomicInteger(0);
    +
    +    /**
    +     * Number of connection attempts since the last disconnect.
    +     */
    +    private final AtomicInteger connectionAttempts = new AtomicInteger(0);
    +
    +    /**
    +     * Number of messages successfully sent to the remote destination.
    +     */
    +    private final AtomicInteger messagesSent = new AtomicInteger(0);
    +
    +    /**
    +     * Number of messages that could not be sent to the remote destination.
    +     */
    +    private final AtomicInteger messagesLost = new AtomicInteger(0);
    +
    +    /**
    +     * Number of messages buffered in memory.
    +     */
    +    private final AtomicLong pendingMessages = new AtomicLong(0);
    +
    +    /**
    +     * This flag is set to true if and only if a client instance is being 
closed.
    +     */
    +    private volatile boolean closing = false;
    +
    +    /**
    +     * When set to true, then the background flusher thread will flush any 
pending messages on its next run.
    +     */
    +    private final AtomicBoolean backgroundFlushingEnabled = new 
AtomicBoolean(false);
    +
    +    /**
    +     * The absolute time (in ms) when the next background flush should be 
performed.
    +     *
    +     * Note: The flush operation will only be performed if 
backgroundFlushingEnabled is true, too.
    +     */
    +    private final AtomicLong nextBackgroundFlushTimeMs = new 
AtomicLong(DISTANT_FUTURE_TIME_MS);
    +
    +    /**
    +     * The time interval (in ms) at which the background flusher thread 
will be run to check for any pending messages
    +     * to be flushed.
    +     */
    +    private final int flushCheckIntervalMs;
    +
    +    /**
    +     * How many messages should be batched together before sending them to 
the remote destination.
    +     *
    +     * Messages are batched to optimize network throughput at the expense 
of latency.
    +     */
    +    private final int messageBatchSize;
     
         private MessageBatch messageBatch = null;
    -    private AtomicLong flushCheckTimer;
    -    private int flushCheckInterval;
    -    private ScheduledExecutorService scheduler;
    +    private final ListeningScheduledExecutorService scheduler;
    +    protected final Map stormConf;
     
         @SuppressWarnings("rawtypes")
    -    Client(Map storm_conf, ChannelFactory factory, 
    -            ScheduledExecutorService scheduler, String host, int port) {
    -           this.storm_conf = storm_conf;
    -        this.factory = factory;
    -        this.scheduler = scheduler;
    -        channelRef = new AtomicReference<Channel>(null);
    +    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService 
scheduler, String host, int port) {
             closing = false;
    -        pendings = new AtomicLong(0);
    -        flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
    -        totalReconnects = new AtomicInteger(0);
    -        messagesSent = new AtomicInteger(0);
    -        messagesLostReconnect = new AtomicInteger(0);
    -
    -        // Configure
    -        buffer_size = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
    -        max_retries = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
    -        base_sleep_ms = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
    -        max_sleep_ms = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
    -        retryPolicy = new 
StormBoundedExponentialBackoffRetry(base_sleep_ms, max_sleep_ms, max_retries);
    -
    -        this.messageBatchSize = 
Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
    -        
    -        flushCheckInterval = 
Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); 
// default 10 ms
    -
    -        LOG.info("New Netty Client, connect to " + host + ", " + port
    -                + ", config: " + ", buffer_size: " + buffer_size);
    -
    -        bootstrap = new ClientBootstrap(factory);
    +        this.stormConf = stormConf;
    +        this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
    +        int bufferSize = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
    +        LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: 
{}", host, port, bufferSize);
    +        messageBatchSize = 
Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
    +        flushCheckIntervalMs = 
Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
    +
    +        maxReconnectionAttempts = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
    +        int minWaitMs = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
    +        int maxWaitMs = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
    +        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, 
maxWaitMs, maxReconnectionAttempts);
    +
    +        // Initiate connection to remote destination
    +        bootstrap = createClientBootstrap(factory, bufferSize);
    +        dstAddress = new InetSocketAddress(host, port);
    +        dstAddressPrefixedName = prefixedName(dstAddress);
    +        connect(NO_DELAY_MS);
    +
    +        // Launch background flushing thread
    +        pauseBackgroundFlushing();
    +        long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs 
* maxReconnectionAttempts);
    +        scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), 
initialDelayMs, flushCheckIntervalMs,
    +            TimeUnit.MILLISECONDS);
    +    }
    +
    +    private ClientBootstrap createClientBootstrap(ChannelFactory factory, 
int bufferSize) {
    +        ClientBootstrap bootstrap = new ClientBootstrap(factory);
             bootstrap.setOption("tcpNoDelay", true);
    -        bootstrap.setOption("sendBufferSize", buffer_size);
    +        bootstrap.setOption("sendBufferSize", bufferSize);
             bootstrap.setOption("keepAlive", true);
    -
    -        // Set up the pipeline factory.
             bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
    +        return bootstrap;
    +    }
     
    -        // Start the connection attempt.
    -        remote_addr = new InetSocketAddress(host, port);
    -        
    -        // setup the connection asyncly now
    -        scheduler.execute(new Runnable() {
    -            @Override
    -            public void run() {   
    -                connect();
    -            }
    -        });
    -        
    -        Runnable flusher = new Runnable() {
    +    private String prefixedName(InetSocketAddress dstAddress) {
    +        if (null != dstAddress) {
    +            return PREFIX + dstAddress.toString();
    +        }
    +        return "";
    +    }
    +
    +    private Runnable createBackgroundFlusher() {
    +        return new Runnable() {
                 @Override
                 public void run() {
    -
    -                if(!closing) {
    -                    long flushCheckTime = flushCheckTimer.get();
    -                    long now = System.currentTimeMillis();
    -                    if (now > flushCheckTime) {
    -                        Channel channel = channelRef.get();
    -                        if (null != channel && channel.isWritable()) {
    -                            flush(channel);
    -                        }
    -                    }
    +                if(!closing && backgroundFlushingEnabled.get() && 
nowMillis() > nextBackgroundFlushTimeMs.get()) {
    +                    LOG.debug("flushing {} pending messages to {} in 
background", messageBatch.size(),
    +                        dstAddressPrefixedName);
    +                    flushPendingMessages();
                     }
    -                
                 }
             };
    -        
    -        long initialDelay = Math.min(30L * 1000, max_sleep_ms * 
max_retries); //max wait for 30s
    -        scheduler.scheduleWithFixedDelay(flusher, initialDelay, 
flushCheckInterval, TimeUnit.MILLISECONDS);
    +    }
    +
    +    private void pauseBackgroundFlushing() {
    +        backgroundFlushingEnabled.set(false);
    +    }
    +
    +    private void resumeBackgroundFlushing() {
    +        backgroundFlushingEnabled.set(true);
    +    }
    +
    +    private synchronized void flushPendingMessages() {
    +        Channel channel = channelRef.get();
    +        if (containsMessages(messageBatch)) {
    +            if (connectionEstablished(channel)) {
    +                if (channel.isWritable()) {
    +                    pauseBackgroundFlushing();
    +                    MessageBatch toBeFlushed = messageBatch;
    +                    flushMessages(channel, toBeFlushed);
    +                    messageBatch = null;
    +                }
    +                else if (closing) {
    +                    // Ensure background flushing is enabled so that we 
definitely have a chance to re-try the flush
    +                    // operation in case the client is being gracefully 
closed (where we have a brief time window where
    +                    // the client will wait for pending messages to be 
sent).
    +                    resumeBackgroundFlushing();
    +                }
    +            }
    +            else {
    +                closeChannelAndReconnect(channel);
    +            }
    +        }
    +    }
    +
    +    private long nowMillis() {
    +        return System.currentTimeMillis();
         }
     
         /**
          * We will retry connection with exponential back-off policy
          */
    -    private synchronized void connect() {
    +    private synchronized void connect(long delayMs) {
             try {
    +            if (closing) {
    +                return;
    +            }
     
    -            Channel channel = channelRef.get();
    -            if (channel != null && channel.isConnected()) {
    +            if (connectionEstablished(channelRef.get())) {
                     return;
                 }
     
    -            int tried = 0;
    -            //setting channel to null to make sure we throw an exception 
when reconnection fails
    -            channel = null;
    -            while (tried <= max_retries) {
    -
    -                LOG.info("Reconnect started for {}... [{}]", name(), 
tried);
    -                LOG.debug("connection started...");
    -
    -                totalReconnects.getAndIncrement();
    -                ChannelFuture future = bootstrap.connect(remote_addr);
    -                future.awaitUninterruptibly();
    -                Channel current = future.getChannel();
    -                if (!future.isSuccess()) {
    -                    if (null != current) {
    -                        current.close();
    +            connectionAttempts.getAndIncrement();
    +            if (reconnectingAllowed()) {
    +                totalConnectionAttempts.getAndIncrement();
    +                LOG.info("connection attempt {} to {} scheduled to run in 
{} ms", connectionAttempts.get(),
    +                    dstAddressPrefixedName, delayMs);
    +                ListenableFuture<Channel> channelFuture = 
scheduler.schedule(
    +                    new Connector(dstAddress, connectionAttempts.get()), 
delayMs, TimeUnit.MILLISECONDS);
    +                Futures.addCallback(channelFuture, new 
FutureCallback<Channel>() {
    +                    @Override public void onSuccess(Channel result) {
    +                        if (connectionEstablished(result)) {
    +                            setChannel(result);
    +                            LOG.info("connection established to {}", 
dstAddressPrefixedName);
    +                            connectionAttempts.set(0);
    +                        }
    +                        else {
    +                            reconnectAgain(new RuntimeException("Returned 
channel was actually not established"));
    +                        }
    +                    }
    +
    +                    @Override public void onFailure(Throwable t) {
    +                        reconnectAgain(t);
                         }
    -                } else {
    -                    channel = current;
    -                    break;
    -                }
    -                Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0));
    -                tried++;  
    +
    +                    private void reconnectAgain(Throwable t) {
    +                        String baseMsg = String.format("connection attempt 
%s to %s failed", connectionAttempts,
    +                            dstAddressPrefixedName);
    +                        String failureMsg = (t == null)? baseMsg : baseMsg 
+ ": " + t.toString();
    +                        LOG.error(failureMsg);
    +                        long nextDelayMs = 
retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
    +                        connect(nextDelayMs);
    +                    }
    +                });
                 }
    -            if (null != channel) {
    -                LOG.info("connection established to a remote host " + 
name() + ", " + channel.toString());
    -                channelRef.set(channel);
    -            } else {
    +            else {
                     close();
    -                throw new RuntimeException("Remote address is not 
reachable. We will close this client " + name());
    +                throw new RuntimeException("Giving up to connect to " + 
dstAddressPrefixedName + " after " +
    +                    connectionAttempts + " failed attempts");
                 }
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException("connection failed " + name(), e);
             }
    +        catch (Exception e) {
    +            throw new RuntimeException("Failed to connect to " + 
dstAddressPrefixedName, e);
    +        }
    +    }
    +
    +    private void setChannel(Channel channel) {
    +        channelRef.set(channel);
    +    }
    +
    +    private boolean reconnectingAllowed() {
    +        return !closing && connectionAttempts.get() <= 
(maxReconnectionAttempts + 1);
    +    }
    +
    +    private boolean connectionEstablished(Channel channel) {
    +        // Because we are using TCP (which is a connection-oriented 
transport unlike UDP), a connection is only fully
    +        // established iff the channel is connected.  That is, a TCP-based 
channel must be in the CONNECTED state before
    +        // anything can be read or written to the channel.
    +        //
    +        // See:
    +        // - 
http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
    +        // - 
http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
    +        return channel != null && channel.isConnected();
         }
     
         /**
    -     * Enqueue task messages to be sent to server
    +     * Note:  Storm will check via this method whether a worker can be 
activated safely during the initial startup of a
    +     * topology.  The worker will only be activated once all of the its 
connections are ready.
          */
    -    synchronized public void send(Iterator<TaskMessage> msgs) {
    +    @Override
    +    public Status status() {
    +        if (closing) {
    +            return Status.Closed;
    +        }
    +        else if (!connectionEstablished(channelRef.get())) {
    +            return Status.Connecting;
    +        }
    +        else {
    +            return Status.Ready;
    +        }
    +    }
    +
    +    /**
    +     * Receiving messages is not supported by a client.
    +     *
    +     * @throws java.lang.UnsupportedOperationException whenever this 
method is being called.
    +     */
    +    @Override
    +    public Iterator<TaskMessage> recv(int flags, int clientId) {
    +        throw new UnsupportedOperationException("Client connection should 
not receive any messages");
    +    }
     
    -        // throw exception if the client is being closed
    +    @Override
    +    public void send(int taskId, byte[] payload) {
    +        TaskMessage msg = new TaskMessage(taskId, payload);
    +        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
    +        wrapper.add(msg);
    +        send(wrapper.iterator());
    +    }
    +
    +    /**
    +     * Enqueue task messages to be sent to the remote destination (cf. 
`host` and `port`).
    +     */
    +    @Override
    +    public synchronized void send(Iterator<TaskMessage> msgs) {
             if (closing) {
    -            throw new RuntimeException("Client is being closed, and does 
not take requests any more");
    +            int numMessages = iteratorSize(msgs);
    +            LOG.warn("discarding {} messages because the Netty client to 
{} is being closed", numMessages,
    +                dstAddressPrefixedName);
    +            return;
             }
    -        
    -        if (null == msgs || !msgs.hasNext()) {
    +
    +        if (!hasMessages(msgs)) {
                 return;
             }
     
             Channel channel = channelRef.get();
    -        if (null == channel) {
    -            connect();
    -            channel = channelRef.get();
    +        if (!connectionEstablished(channel)) {
    +            // Closing the channel and reconnecting should be done before 
handling the messages.
    +            closeChannelAndReconnect(channel);
    +            handleMessagesWhenConnectionIsUnavailable(msgs);
    +            return;
             }
     
    +        // Collect messages into batches (to optimize network throughput), 
then flush them.
             while (msgs.hasNext()) {
    -            if (!channel.isConnected()) {
    -                connect();
    -                channel = channelRef.get();
    -            }
                 TaskMessage message = msgs.next();
    -            if (null == messageBatch) {
    +            if (messageBatch == null) {
                     messageBatch = new MessageBatch(messageBatchSize);
                 }
     
                 messageBatch.add(message);
                 if (messageBatch.isFull()) {
                     MessageBatch toBeFlushed = messageBatch;
    -                flushRequest(channel, toBeFlushed);
    +                flushMessages(channel, toBeFlushed);
                     messageBatch = null;
                 }
             }
     
    -        if (null != messageBatch && !messageBatch.isEmpty()) {
    -            if (channel.isWritable()) {
    -                flushCheckTimer.set(Long.MAX_VALUE);
    -                
    -                // Flush as fast as we can to reduce the latency
    +        // Handle any remaining messages in case the "last" batch was not 
full.
    +        if (containsMessages(messageBatch)) {
    +            if (connectionEstablished(channel) && channel.isWritable()) {
    +                // We can write to the channel, so we flush the remaining 
messages immediately to minimize latency.
    +                pauseBackgroundFlushing();
                     MessageBatch toBeFlushed = messageBatch;
                     messageBatch = null;
    -                flushRequest(channel, toBeFlushed);
    -                
    -            } else {
    -                // when channel is NOT writable, it means the internal 
netty buffer is full. 
    -                // In this case, we can try to buffer up more incoming 
messages.
    -                flushCheckTimer.set(System.currentTimeMillis() + 
flushCheckInterval);
    +                flushMessages(channel, toBeFlushed);
    +            }
    +            else {
    +                // We cannot write to the channel, which means Netty's 
internal write buffer is full.
    +                // In this case, we buffer the remaining messages and wait 
for the next messages to arrive.
    +                //
    +                // Background:
    +                // Netty 3.x maintains an internal write buffer with a 
high water mark for each channel (default: 64K).
    +                // This represents the amount of data waiting to be 
flushed to operating system buffers.  If the
    +                // outstanding data exceeds this value then the channel is 
set to non-writable.  When this happens, a
    +                // INTEREST_CHANGED channel event is triggered.  Netty 
sets the channel to writable again once the data
    +                // has been flushed to the system buffers.
    +                //
    +                // See http://stackoverflow.com/questions/14049260
    +                resumeBackgroundFlushing();
    +                nextBackgroundFlushTimeMs.set(nowMillis() + 
flushCheckIntervalMs);
                 }
             }
     
         }
     
    -    public String name() {
    -        if (null != remote_addr) {
    -            return PREFIX + remote_addr.toString();
    -        }
    -        return "";
    +    private boolean hasMessages(Iterator<TaskMessage> msgs) {
    +        return msgs != null && msgs.hasNext();
         }
     
    -    private synchronized void flush(Channel channel) {
    -        if (!closing) {
    -            if (null != messageBatch && !messageBatch.isEmpty()) {
    -                MessageBatch toBeFlushed = messageBatch;
    -                flushCheckTimer.set(Long.MAX_VALUE);
    -                flushRequest(channel, toBeFlushed);
    -                messageBatch = null;
    +    /**
    +     * We will drop pending messages and let at-least-once message replay 
kick in.
    +     *
    +     * Another option would be to buffer the messages in memory.  But this 
option has the risk of causing OOM errors,
    +     * especially for topologies that disable message acking because we 
don't know whether the connection recovery will
    +     * succeed  or not, and how long the recovery will take.
    +     */
    +    private void 
handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
    +        LOG.error("connection to {} is unavailable", 
dstAddressPrefixedName);
    +        dropPendingMessages(msgs);
    +    }
    +
    +    private void dropPendingMessages(Iterator<TaskMessage> msgs) {
    +        // We consume the iterator by traversing and thus "emptying" it.
    +        int msgCount = iteratorSize(msgs);
    --- End diff --
    
    Fixed.  Good catch, Bobby!


> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
>                 Key: STORM-329
>                 URL: https://issues.apache.org/jira/browse/STORM-329
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.9.2-incubating
>            Reporter: Sean Zhong
>            Priority: Minor
>              Labels: Netty
>         Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought 
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986] 
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are 
> blocking. My biggest concern around the blocking is in the case of a worker 
> crashing. If a single worker crashes this can block the entire topology from 
> executing until that worker comes back up. In some cases I can see that being 
> something that you would want. In other cases I can see speed being the 
> primary concern and some users would like to get partial data fast, rather 
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max 
> limit to the buffering that is allowed, before we block, or throw data away 
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed 
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the 
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the 
> messages, and use the built-in storm failover mechanism? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to