[
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14322622#comment-14322622
]
ASF GitHub Bot commented on STORM-329:
--------------------------------------
Github user miguno commented on a diff in the pull request:
https://github.com/apache/storm/pull/429#discussion_r24742354
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -42,344 +42,577 @@
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
-public class Client implements IConnection, IStatefulObject{
+/**
+ * A Netty client for sending task messages to a remote destination (Netty
server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed
asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at
the expense of network latency. The message
+ * batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ * - Note: The current implementation drops any messages that are
being enqueued for sending if the connection to
+ * the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background. It will, at
fixed intervals, check for any pending messages
+ * (i.e. messages buffered in memory) and flush them to the remote
destination iff background flushing is currently
+ * enabled.
+ */
+public class Client extends ConnectionWithStatus implements
IStatefulObject {
+
private static final Logger LOG =
LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
- private final int max_retries;
- private final int base_sleep_ms;
- private final int max_sleep_ms;
+ private static final long NO_DELAY_MS = 0L;
+ private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
+ private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+ private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+ private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
private final StormBoundedExponentialBackoffRetry retryPolicy;
- private AtomicReference<Channel> channelRef;
private final ClientBootstrap bootstrap;
- private InetSocketAddress remote_addr;
-
- private AtomicInteger totalReconnects;
- private AtomicInteger messagesSent;
- private AtomicInteger messagesLostReconnect;
- private final Random random = new Random();
- private final ChannelFactory factory;
- private final int buffer_size;
- private boolean closing;
-
- private int messageBatchSize;
-
- private AtomicLong pendings;
-
- Map storm_conf;
+ private final InetSocketAddress dstAddress;
+ protected final String dstAddressPrefixedName;
+
+ /**
+ * The channel used for all write operations from this client to the
remote destination.
+ */
+ private final AtomicReference<Channel> channelRef = new
AtomicReference<Channel>(null);
+
+
+ /**
+ * Maximum number of reconnection attempts we will perform after a
disconnect before giving up.
+ */
+ private final int maxReconnectionAttempts;
--- End diff --
I see your point, Bobby, and I also can confirm the behavior you described.
Right now the behavior is as follows. If the maximum number of
reconnection attempts is reached, we enable the flag `closing`. One effect of
this is that any subsequent `send()`ing of messages will result in those
messages being discarded.
Example log message:
```
2015-02-16 10:06:29 b.s.m.n.Client [ERROR] discarding 1 messages because
the Netty client to Netty-Client-supervisor4/10.0.0.104:6701 is being closed
```
A second effect is that the Netty client will never attempt to reconnect
again (by design). But this has a negative effect, too, and I think this is
what Bobby is alluding to. If a Netty client's `closing` is true (= because
max reconnects was reached), then a Storm task using this Netty client will
never be able again to send a message to the target remote destination of the
Netty client. Only if the Storm task itself were to die will such a reconnect
be possible because we would then also start a new Netty client. The current
code will not, as Bobby is pointing out, cause the parent Storm task (or even
the Netty client) to die -- instead the client will stay forever in the
`closing` state, and the parent Storm task will continue to call the client's
`send()` method for new messages, which in turn will forever drop any such
messages.
Off the hip I'd say there are at least three approaches for addressing this:
1. Let the Netty client die if max retries is reached, so that the Storm
task has the chance to re-create a client and thus break out of the client's
discard-messages-forever state.
2. Let the Storm task that uses the Netty client die if (one of its
possibly many) Netty clients dies, so that by restarting the task we'll also
get a new Netty client.
3. Remove the max retries semantics as well as the corresponding setting
from Storm's configuration. Here, a Netty client will continue to reconnect to
a remote destination forever. The possible negative impact of these reconnects
(e.g. number of TCP connection attempts in a cluster) are kept in check by our
exponential backoff policy for such connection retries.
My personal comments to these three approaches:
- I do not like (1) because I feel it introduces potentially confusing
semantics: We keep having a max retries setting, but it is not really a hard
limit anymore. It rather becomes a "max retries until we recreate a Netty
client", and would also reset any exponential backoff strategy of the
"previous" Netty client instance (cf. `StormBoundedExponentialBackoffRetry`).
If we do want such resets (but I don't think we do at this point), then a
cleaner approach would be to implement such resetting inside the retry policy
(again, cf. `StormBoundedExponentialBackoffRetry`).
- I do not like (2) because a single "bad" Netty client would be able to
take down a Storm task, which among other things would also impact any other,
working Netty clients of the Storm task.
- Option (3) seems a reasonable approach, although it breaks backwards
compatibility with regard to Storm's configuration (because we'd now ignore
`storm.messaging.netty.max_retries`).
Personally, I am fine with either a separate JIRA or, if consensus is
reached quickly here, to address this directly in this pull request.
If we want to address this directly in the pull request, we only need to
change a single line in `Client.java` (apart from follow-up changes such as
updating `conf/defaults.yaml` to remove `storm.messaging.netty.max_retries`):
```java
private boolean reconnectingAllowed() {
// BEFORE:
// return !closing && connectionAttempts.get() <=
(maxReconnectionAttempts + 1);
return !closing;
}
```
> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
> Key: STORM-329
> URL: https://issues.apache.org/jira/browse/STORM-329
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.9.2-incubating
> Reporter: Sean Zhong
> Priority: Minor
> Labels: Netty
> Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986]
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are
> blocking. My biggest concern around the blocking is in the case of a worker
> crashing. If a single worker crashes this can block the entire topology from
> executing until that worker comes back up. In some cases I can see that being
> something that you would want. In other cases I can see speed being the
> primary concern and some users would like to get partial data fast, rather
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max
> limit to the buffering that is allowed, before we block, or throw data away
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the
> messages, and use the built-in storm failover mechanism?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)