Github user miguno commented on a diff in the pull request:
https://github.com/apache/storm/pull/429#discussion_r24742354
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -42,344 +42,577 @@
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
-public class Client implements IConnection, IStatefulObject{
+/**
+ * A Netty client for sending task messages to a remote destination (Netty
server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed
asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at
the expense of network latency. The message
+ * batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ * - Note: The current implementation drops any messages that are
being enqueued for sending if the connection to
+ * the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background. It will, at
fixed intervals, check for any pending messages
+ * (i.e. messages buffered in memory) and flush them to the remote
destination iff background flushing is currently
+ * enabled.
+ */
+public class Client extends ConnectionWithStatus implements
IStatefulObject {
+
private static final Logger LOG =
LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
- private final int max_retries;
- private final int base_sleep_ms;
- private final int max_sleep_ms;
+ private static final long NO_DELAY_MS = 0L;
+ private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
+ private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+ private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+ private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
private final StormBoundedExponentialBackoffRetry retryPolicy;
- private AtomicReference<Channel> channelRef;
private final ClientBootstrap bootstrap;
- private InetSocketAddress remote_addr;
-
- private AtomicInteger totalReconnects;
- private AtomicInteger messagesSent;
- private AtomicInteger messagesLostReconnect;
- private final Random random = new Random();
- private final ChannelFactory factory;
- private final int buffer_size;
- private boolean closing;
-
- private int messageBatchSize;
-
- private AtomicLong pendings;
-
- Map storm_conf;
+ private final InetSocketAddress dstAddress;
+ protected final String dstAddressPrefixedName;
+
+ /**
+ * The channel used for all write operations from this client to the
remote destination.
+ */
+ private final AtomicReference<Channel> channelRef = new
AtomicReference<Channel>(null);
+
+
+ /**
+ * Maximum number of reconnection attempts we will perform after a
disconnect before giving up.
+ */
+ private final int maxReconnectionAttempts;
--- End diff --
I see your point, Bobby, and I also can confirm the behavior you described.
Right now the behavior is as follows. If the maximum number of
reconnection attempts is reached, we enable the flag `closing`. One effect of
this is that any subsequent `send()`ing of messages will result in those
messages being discarded.
Example log message:
```
2015-02-16 10:06:29 b.s.m.n.Client [ERROR] discarding 1 messages because
the Netty client to Netty-Client-supervisor4/10.0.0.104:6701 is being closed
```
A second effect is that the Netty client will never attempt to reconnect
again (by design). But this has a negative effect, too, and I think this is
what Bobby is alluding to. If a Netty client's `closing` is true (= because
max reconnects was reached), then a Storm task using this Netty client will
never be able again to send a message to the target remote destination of the
Netty client. Only if the Storm task itself were to die will such a reconnect
be possible because we would then also start a new Netty client. The current
code will not, as Bobby is pointing out, cause the parent Storm task (or even
the Netty client) to die -- instead the client will stay forever in the
`closing` state, and the parent Storm task will continue to call the client's
`send()` method for new messages, which in turn will forever drop any such
messages.
Off the hip I'd say there are at least three approaches for addressing this:
1. Let the Netty client die if max retries is reached, so that the Storm
task has the chance to re-create a client and thus break out of the client's
discard-messages-forever state.
2. Let the Storm task that uses the Netty client die if (one of its
possibly many) Netty clients dies, so that by restarting the task we'll also
get a new Netty client.
3. Remove the max retries semantics as well as the corresponding setting
from Storm's configuration. Here, a Netty client will continue to reconnect to
a remote destination forever. The possible negative impact of these reconnects
(e.g. number of TCP connection attempts in a cluster) are kept in check by our
exponential backoff policy for such connection retries.
My personal comments to these three approaches:
- I do not like (1) because I feel it introduces potentially confusing
semantics: We keep having a max retries setting, but it is not really a hard
limit anymore. It rather becomes a "max retries until we recreate a Netty
client", and would also reset any exponential backoff strategy of the
"previous" Netty client instance (cf. `StormBoundedExponentialBackoffRetry`).
If we do want such resets (but I don't think we do at this point), then a
cleaner approach would be to implement such resetting inside the retry policy
(again, cf. `StormBoundedExponentialBackoffRetry`).
- I do not like (2) because a single "bad" Netty client would be able to
take down a Storm task, which among other things would also impact any other,
working Netty clients of the Storm task.
- Option (3) seems a reasonable approach, although it breaks backwards
compatibility with regard to Storm's configuration (because we'd now ignore
`storm.messaging.netty.max_retries`).
Personally, I am fine with either a separate JIRA or, if consensus is
reached quickly here, to address this directly in this pull request.
If we want to address this directly in the pull request, we only need to
change a single line in `Client.java` (apart from follow-up changes such as
updating `conf/defaults.yaml` to remove `storm.messaging.netty.max_retries`):
```java
private boolean reconnectingAllowed() {
// BEFORE:
// return !closing && connectionAttempts.get() <=
(maxReconnectionAttempts + 1);
return !closing;
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---