[ 
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14324338#comment-14324338
 ] 

ASF GitHub Bot commented on STORM-329:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/429#discussion_r24824540
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -42,344 +42,577 @@
     import org.slf4j.LoggerFactory;
     
     import backtype.storm.Config;
    +import backtype.storm.messaging.ConnectionWithStatus;
     import backtype.storm.metric.api.IStatefulObject;
    -import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
     import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
     import backtype.storm.utils.Utils;
     
    -public class Client implements IConnection, IStatefulObject{
    +/**
    + * A Netty client for sending task messages to a remote destination (Netty 
server).
    + *
    + * Implementation details:
    + *
    + * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
    + * - Messages are sent in batches to optimize for network throughput at 
the expense of network latency.  The message
    + *   batch size is configurable.
    + * - Connecting and reconnecting are performed asynchronously.
    + *     - Note: The current implementation drops any messages that are 
being enqueued for sending if the connection to
    + *       the remote destination is currently unavailable.
    + * - A background flusher thread is run in the background.  It will, at 
fixed intervals, check for any pending messages
    + *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
    + *   enabled.
    + */
    +public class Client extends ConnectionWithStatus implements 
IStatefulObject {
    +
         private static final Logger LOG = 
LoggerFactory.getLogger(Client.class);
         private static final String PREFIX = "Netty-Client-";
    -    private final int max_retries;
    -    private final int base_sleep_ms;
    -    private final int max_sleep_ms;
    +    private static final long NO_DELAY_MS = 0L;
    +    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
    +    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
    +    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
    +    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
    +
         private final StormBoundedExponentialBackoffRetry retryPolicy;
    -    private AtomicReference<Channel> channelRef;
         private final ClientBootstrap bootstrap;
    -    private InetSocketAddress remote_addr;
    -    
    -    private AtomicInteger totalReconnects;
    -    private AtomicInteger messagesSent;
    -    private AtomicInteger messagesLostReconnect;
    -    private final Random random = new Random();
    -    private final ChannelFactory factory;
    -    private final int buffer_size;
    -    private boolean closing;
    -
    -    private int messageBatchSize;
    -    
    -    private AtomicLong pendings;
    -    
    -    Map storm_conf;
    +    private final InetSocketAddress dstAddress;
    +    protected final String dstAddressPrefixedName;
    +
    +    /**
    +     * The channel used for all write operations from this client to the 
remote destination.
    +     */
    +    private final AtomicReference<Channel> channelRef = new 
AtomicReference<Channel>(null);
    +
    +
    +    /**
    +     * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
    +     */
    +    private final int maxReconnectionAttempts;
    --- End diff --
    
    I personally prefer option 3, no maximum number of reconnection attempts.  
Having the client decide that it is done, before nimbus does feels like it is 
asking for trouble.


> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
>                 Key: STORM-329
>                 URL: https://issues.apache.org/jira/browse/STORM-329
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.9.2-incubating
>            Reporter: Sean Zhong
>            Priority: Minor
>              Labels: Netty
>         Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought 
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986] 
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are 
> blocking. My biggest concern around the blocking is in the case of a worker 
> crashing. If a single worker crashes this can block the entire topology from 
> executing until that worker comes back up. In some cases I can see that being 
> something that you would want. In other cases I can see speed being the 
> primary concern and some users would like to get partial data fast, rather 
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max 
> limit to the buffering that is allowed, before we block, or throw data away 
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed 
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the 
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the 
> messages, and use the built-in storm failover mechanism? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to