[
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326039#comment-14326039
]
ASF GitHub Bot commented on STORM-329:
--------------------------------------
Github user ptgoetz commented on a diff in the pull request:
https://github.com/apache/storm/pull/429#discussion_r24909769
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -42,344 +42,577 @@
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
-public class Client implements IConnection, IStatefulObject{
+/**
+ * A Netty client for sending task messages to a remote destination (Netty
server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed
asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at
the expense of network latency. The message
+ * batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ * - Note: The current implementation drops any messages that are
being enqueued for sending if the connection to
+ * the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background. It will, at
fixed intervals, check for any pending messages
+ * (i.e. messages buffered in memory) and flush them to the remote
destination iff background flushing is currently
+ * enabled.
+ */
+public class Client extends ConnectionWithStatus implements
IStatefulObject {
+
private static final Logger LOG =
LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
- private final int max_retries;
- private final int base_sleep_ms;
- private final int max_sleep_ms;
+ private static final long NO_DELAY_MS = 0L;
+ private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
+ private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+ private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+ private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
private final StormBoundedExponentialBackoffRetry retryPolicy;
- private AtomicReference<Channel> channelRef;
private final ClientBootstrap bootstrap;
- private InetSocketAddress remote_addr;
-
- private AtomicInteger totalReconnects;
- private AtomicInteger messagesSent;
- private AtomicInteger messagesLostReconnect;
- private final Random random = new Random();
- private final ChannelFactory factory;
- private final int buffer_size;
- private boolean closing;
-
- private int messageBatchSize;
-
- private AtomicLong pendings;
-
- Map storm_conf;
+ private final InetSocketAddress dstAddress;
+ protected final String dstAddressPrefixedName;
+
+ /**
+ * The channel used for all write operations from this client to the
remote destination.
+ */
+ private final AtomicReference<Channel> channelRef = new
AtomicReference<Channel>(null);
+
+
+ /**
+ * Maximum number of reconnection attempts we will perform after a
disconnect before giving up.
+ */
+ private final int maxReconnectionAttempts;
--- End diff --
I'm in favor of option 3 as well. I'm not that concerned about
`storm.messaging.netty.max_retries` being ignored. We could probably just log a
warning that that configuration option is deprecated and will be ignored if the
value is set.
> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
> Key: STORM-329
> URL: https://issues.apache.org/jira/browse/STORM-329
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.9.2-incubating
> Reporter: Sean Zhong
> Priority: Minor
> Labels: Netty
> Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986]
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are
> blocking. My biggest concern around the blocking is in the case of a worker
> crashing. If a single worker crashes this can block the entire topology from
> executing until that worker comes back up. In some cases I can see that being
> something that you would want. In other cases I can see speed being the
> primary concern and some users would like to get partial data fast, rather
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max
> limit to the buffering that is allowed, before we block, or throw data away
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the
> messages, and use the built-in storm failover mechanism?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)