Github user miguno commented on a diff in the pull request:

    https://github.com/apache/storm/pull/429#discussion_r24742354
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -42,344 +42,577 @@
     import org.slf4j.LoggerFactory;
     
     import backtype.storm.Config;
    +import backtype.storm.messaging.ConnectionWithStatus;
     import backtype.storm.metric.api.IStatefulObject;
    -import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
     import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
     import backtype.storm.utils.Utils;
     
    -public class Client implements IConnection, IStatefulObject{
    +/**
    + * A Netty client for sending task messages to a remote destination (Netty 
server).
    + *
    + * Implementation details:
    + *
    + * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
    + * - Messages are sent in batches to optimize for network throughput at 
the expense of network latency.  The message
    + *   batch size is configurable.
    + * - Connecting and reconnecting are performed asynchronously.
    + *     - Note: The current implementation drops any messages that are 
being enqueued for sending if the connection to
    + *       the remote destination is currently unavailable.
    + * - A background flusher thread is run in the background.  It will, at 
fixed intervals, check for any pending messages
    + *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
    + *   enabled.
    + */
    +public class Client extends ConnectionWithStatus implements 
IStatefulObject {
    +
         private static final Logger LOG = 
LoggerFactory.getLogger(Client.class);
         private static final String PREFIX = "Netty-Client-";
    -    private final int max_retries;
    -    private final int base_sleep_ms;
    -    private final int max_sleep_ms;
    +    private static final long NO_DELAY_MS = 0L;
    +    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
    +    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
    +    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
    +    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
    +
         private final StormBoundedExponentialBackoffRetry retryPolicy;
    -    private AtomicReference<Channel> channelRef;
         private final ClientBootstrap bootstrap;
    -    private InetSocketAddress remote_addr;
    -    
    -    private AtomicInteger totalReconnects;
    -    private AtomicInteger messagesSent;
    -    private AtomicInteger messagesLostReconnect;
    -    private final Random random = new Random();
    -    private final ChannelFactory factory;
    -    private final int buffer_size;
    -    private boolean closing;
    -
    -    private int messageBatchSize;
    -    
    -    private AtomicLong pendings;
    -    
    -    Map storm_conf;
    +    private final InetSocketAddress dstAddress;
    +    protected final String dstAddressPrefixedName;
    +
    +    /**
    +     * The channel used for all write operations from this client to the 
remote destination.
    +     */
    +    private final AtomicReference<Channel> channelRef = new 
AtomicReference<Channel>(null);
    +
    +
    +    /**
    +     * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
    +     */
    +    private final int maxReconnectionAttempts;
    --- End diff --
    
    I see your point, Bobby, and I also can confirm the behavior you described.
    
    Right now the behavior is as follows.  If the maximum number of 
reconnection attempts is reached, we enable the flag `closing`.  One effect of 
this is that any subsequent `send()`ing of messages will result in those 
messages being discarded.
    
    Example log message:
    
    ```
    2015-02-16 10:06:29 b.s.m.n.Client [ERROR] discarding 1 messages because 
the Netty client to Netty-Client-supervisor4/10.0.0.104:6701 is being closed
    ```
    
    A second effect is that the Netty client will never attempt to reconnect 
again (by design).  But this has a negative effect, too, and I think this is 
what Bobby is alluding to.  If a Netty client's `closing` is true (= because 
max reconnects was reached), then a Storm task using this Netty client will 
never be able again to send a message to the target remote destination of the 
Netty client.  Only if the Storm task itself were to die will such a reconnect 
be possible because we would then also start a new Netty client.  The current 
code will not, as Bobby is pointing out, cause the parent Storm task (or even 
the Netty client) to die -- instead the client will stay forever in the 
`closing` state, and the parent Storm task will continue to call the client's 
`send()` method for new messages, which in turn will forever drop any such 
messages.
    
    Off the hip I'd say there are at least three approaches for addressing this:
    
    1. Let the Netty client die if max retries is reached, so that the Storm 
task has the chance to re-create a client and thus break out of the client's 
discard-messages-forever state.
    2. Let the Storm task that uses the Netty client die if (one of its 
possibly many) Netty clients dies, so that by restarting the task we'll also 
get a new Netty client.
    3. Remove the max retries semantics as well as the corresponding setting 
from Storm's configuration.  Here, a Netty client will continue to reconnect to 
a remote destination forever.  The possible negative impact of these reconnects 
(e.g. number of TCP connection attempts in a cluster) are kept in check by our 
exponential backoff policy for such connection retries.
    
    My personal comments to these three approaches:
    
    - I do not like (1) because I feel it introduces potentially confusing 
semantics: We keep having a max retries setting, but it is not really a hard 
limit anymore.  It rather becomes a "max retries until we recreate a Netty 
client", and would also reset any exponential backoff strategy of the 
"previous" Netty client instance (cf. `StormBoundedExponentialBackoffRetry`).  
If we do want such resets (but I don't think we do at this point), then a 
cleaner approach would be to implement such resetting inside the retry policy 
(again, cf. `StormBoundedExponentialBackoffRetry`).
    - I do not like (2) because a single "bad" Netty client would be able to 
take down a Storm task, which among other things would also impact any other, 
working Netty clients of the Storm task.
    - Option (3) seems a reasonable approach, although it breaks backwards 
compatibility with regard to Storm's configuration (because we'd now ignore 
`storm.messaging.netty.max_retries`).
    
    Personally, I am fine with either a separate JIRA or, if consensus is 
reached quickly here, to address this directly in this pull request.
    
    If we want to address this directly in the pull request, we only need to 
change a single line in `Client.java` (apart from follow-up changes such as 
updating `conf/defaults.yaml` to remove `storm.messaging.netty.max_retries`):
    
    ```java
        private boolean reconnectingAllowed() {
            // BEFORE:
            // return !closing && connectionAttempts.get() <= 
(maxReconnectionAttempts + 1);
            return !closing;
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to