[ 
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14322622#comment-14322622
 ] 

ASF GitHub Bot commented on STORM-329:
--------------------------------------

Github user miguno commented on a diff in the pull request:

    https://github.com/apache/storm/pull/429#discussion_r24742354
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -42,344 +42,577 @@
     import org.slf4j.LoggerFactory;
     
     import backtype.storm.Config;
    +import backtype.storm.messaging.ConnectionWithStatus;
     import backtype.storm.metric.api.IStatefulObject;
    -import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
     import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
     import backtype.storm.utils.Utils;
     
    -public class Client implements IConnection, IStatefulObject{
    +/**
    + * A Netty client for sending task messages to a remote destination (Netty 
server).
    + *
    + * Implementation details:
    + *
    + * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
    + * - Messages are sent in batches to optimize for network throughput at 
the expense of network latency.  The message
    + *   batch size is configurable.
    + * - Connecting and reconnecting are performed asynchronously.
    + *     - Note: The current implementation drops any messages that are 
being enqueued for sending if the connection to
    + *       the remote destination is currently unavailable.
    + * - A background flusher thread is run in the background.  It will, at 
fixed intervals, check for any pending messages
    + *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
    + *   enabled.
    + */
    +public class Client extends ConnectionWithStatus implements 
IStatefulObject {
    +
         private static final Logger LOG = 
LoggerFactory.getLogger(Client.class);
         private static final String PREFIX = "Netty-Client-";
    -    private final int max_retries;
    -    private final int base_sleep_ms;
    -    private final int max_sleep_ms;
    +    private static final long NO_DELAY_MS = 0L;
    +    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
    +    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
    +    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
    +    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
    +
         private final StormBoundedExponentialBackoffRetry retryPolicy;
    -    private AtomicReference<Channel> channelRef;
         private final ClientBootstrap bootstrap;
    -    private InetSocketAddress remote_addr;
    -    
    -    private AtomicInteger totalReconnects;
    -    private AtomicInteger messagesSent;
    -    private AtomicInteger messagesLostReconnect;
    -    private final Random random = new Random();
    -    private final ChannelFactory factory;
    -    private final int buffer_size;
    -    private boolean closing;
    -
    -    private int messageBatchSize;
    -    
    -    private AtomicLong pendings;
    -    
    -    Map storm_conf;
    +    private final InetSocketAddress dstAddress;
    +    protected final String dstAddressPrefixedName;
    +
    +    /**
    +     * The channel used for all write operations from this client to the 
remote destination.
    +     */
    +    private final AtomicReference<Channel> channelRef = new 
AtomicReference<Channel>(null);
    +
    +
    +    /**
    +     * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
    +     */
    +    private final int maxReconnectionAttempts;
    --- End diff --
    
    I see your point, Bobby, and I also can confirm the behavior you described.
    
    Right now the behavior is as follows.  If the maximum number of 
reconnection attempts is reached, we enable the flag `closing`.  One effect of 
this is that any subsequent `send()`ing of messages will result in those 
messages being discarded.
    
    Example log message:
    
    ```
    2015-02-16 10:06:29 b.s.m.n.Client [ERROR] discarding 1 messages because 
the Netty client to Netty-Client-supervisor4/10.0.0.104:6701 is being closed
    ```
    
    A second effect is that the Netty client will never attempt to reconnect 
again (by design).  But this has a negative effect, too, and I think this is 
what Bobby is alluding to.  If a Netty client's `closing` is true (= because 
max reconnects was reached), then a Storm task using this Netty client will 
never be able again to send a message to the target remote destination of the 
Netty client.  Only if the Storm task itself were to die will such a reconnect 
be possible because we would then also start a new Netty client.  The current 
code will not, as Bobby is pointing out, cause the parent Storm task (or even 
the Netty client) to die -- instead the client will stay forever in the 
`closing` state, and the parent Storm task will continue to call the client's 
`send()` method for new messages, which in turn will forever drop any such 
messages.
    
    Off the hip I'd say there are at least three approaches for addressing this:
    
    1. Let the Netty client die if max retries is reached, so that the Storm 
task has the chance to re-create a client and thus break out of the client's 
discard-messages-forever state.
    2. Let the Storm task that uses the Netty client die if (one of its 
possibly many) Netty clients dies, so that by restarting the task we'll also 
get a new Netty client.
    3. Remove the max retries semantics as well as the corresponding setting 
from Storm's configuration.  Here, a Netty client will continue to reconnect to 
a remote destination forever.  The possible negative impact of these reconnects 
(e.g. number of TCP connection attempts in a cluster) are kept in check by our 
exponential backoff policy for such connection retries.
    
    My personal comments to these three approaches:
    
    - I do not like (1) because I feel it introduces potentially confusing 
semantics: We keep having a max retries setting, but it is not really a hard 
limit anymore.  It rather becomes a "max retries until we recreate a Netty 
client", and would also reset any exponential backoff strategy of the 
"previous" Netty client instance (cf. `StormBoundedExponentialBackoffRetry`).  
If we do want such resets (but I don't think we do at this point), then a 
cleaner approach would be to implement such resetting inside the retry policy 
(again, cf. `StormBoundedExponentialBackoffRetry`).
    - I do not like (2) because a single "bad" Netty client would be able to 
take down a Storm task, which among other things would also impact any other, 
working Netty clients of the Storm task.
    - Option (3) seems a reasonable approach, although it breaks backwards 
compatibility with regard to Storm's configuration (because we'd now ignore 
`storm.messaging.netty.max_retries`).
    
    Personally, I am fine with either a separate JIRA or, if consensus is 
reached quickly here, to address this directly in this pull request.
    
    If we want to address this directly in the pull request, we only need to 
change a single line in `Client.java` (apart from follow-up changes such as 
updating `conf/defaults.yaml` to remove `storm.messaging.netty.max_retries`):
    
    ```java
        private boolean reconnectingAllowed() {
            // BEFORE:
            // return !closing && connectionAttempts.get() <= 
(maxReconnectionAttempts + 1);
            return !closing;
        }
    ```


> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
>                 Key: STORM-329
>                 URL: https://issues.apache.org/jira/browse/STORM-329
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.9.2-incubating
>            Reporter: Sean Zhong
>            Priority: Minor
>              Labels: Netty
>         Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought 
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986] 
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are 
> blocking. My biggest concern around the blocking is in the case of a worker 
> crashing. If a single worker crashes this can block the entire topology from 
> executing until that worker comes back up. In some cases I can see that being 
> something that you would want. In other cases I can see speed being the 
> primary concern and some users would like to get partial data fast, rather 
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max 
> limit to the buffering that is allowed, before we block, or throw data away 
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed 
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the 
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the 
> messages, and use the built-in storm failover mechanism? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to