[
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14322547#comment-14322547
]
ASF GitHub Bot commented on STORM-329:
--------------------------------------
Github user miguno commented on a diff in the pull request:
https://github.com/apache/storm/pull/429#discussion_r24738502
--- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
@@ -42,344 +42,577 @@
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
-public class Client implements IConnection, IStatefulObject{
+/**
+ * A Netty client for sending task messages to a remote destination (Netty
server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed
asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at
the expense of network latency. The message
+ * batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ * - Note: The current implementation drops any messages that are
being enqueued for sending if the connection to
+ * the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background. It will, at
fixed intervals, check for any pending messages
+ * (i.e. messages buffered in memory) and flush them to the remote
destination iff background flushing is currently
+ * enabled.
+ */
+public class Client extends ConnectionWithStatus implements
IStatefulObject {
+
private static final Logger LOG =
LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
- private final int max_retries;
- private final int base_sleep_ms;
- private final int max_sleep_ms;
+ private static final long NO_DELAY_MS = 0L;
+ private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
+ private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+ private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+ private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
private final StormBoundedExponentialBackoffRetry retryPolicy;
- private AtomicReference<Channel> channelRef;
private final ClientBootstrap bootstrap;
- private InetSocketAddress remote_addr;
-
- private AtomicInteger totalReconnects;
- private AtomicInteger messagesSent;
- private AtomicInteger messagesLostReconnect;
- private final Random random = new Random();
- private final ChannelFactory factory;
- private final int buffer_size;
- private boolean closing;
-
- private int messageBatchSize;
-
- private AtomicLong pendings;
-
- Map storm_conf;
+ private final InetSocketAddress dstAddress;
+ protected final String dstAddressPrefixedName;
+
+ /**
+ * The channel used for all write operations from this client to the
remote destination.
+ */
+ private final AtomicReference<Channel> channelRef = new
AtomicReference<Channel>(null);
+
+
+ /**
+ * Maximum number of reconnection attempts we will perform after a
disconnect before giving up.
+ */
+ private final int maxReconnectionAttempts;
+
+ /**
+ * Total number of connection attempts.
+ */
+ private final AtomicInteger totalConnectionAttempts = new
AtomicInteger(0);
+
+ /**
+ * Number of connection attempts since the last disconnect.
+ */
+ private final AtomicInteger connectionAttempts = new AtomicInteger(0);
+
+ /**
+ * Number of messages successfully sent to the remote destination.
+ */
+ private final AtomicInteger messagesSent = new AtomicInteger(0);
+
+ /**
+ * Number of messages that could not be sent to the remote destination.
+ */
+ private final AtomicInteger messagesLost = new AtomicInteger(0);
+
+ /**
+ * Number of messages buffered in memory.
+ */
+ private final AtomicLong pendingMessages = new AtomicLong(0);
+
+ /**
+ * This flag is set to true if and only if a client instance is being
closed.
+ */
+ private volatile boolean closing = false;
+
+ /**
+ * When set to true, then the background flusher thread will flush any
pending messages on its next run.
+ */
+ private final AtomicBoolean backgroundFlushingEnabled = new
AtomicBoolean(false);
+
+ /**
+ * The absolute time (in ms) when the next background flush should be
performed.
+ *
+ * Note: The flush operation will only be performed if
backgroundFlushingEnabled is true, too.
+ */
+ private final AtomicLong nextBackgroundFlushTimeMs = new
AtomicLong(DISTANT_FUTURE_TIME_MS);
+
+ /**
+ * The time interval (in ms) at which the background flusher thread
will be run to check for any pending messages
+ * to be flushed.
+ */
+ private final int flushCheckIntervalMs;
+
+ /**
+ * How many messages should be batched together before sending them to
the remote destination.
+ *
+ * Messages are batched to optimize network throughput at the expense
of latency.
+ */
+ private final int messageBatchSize;
private MessageBatch messageBatch = null;
- private AtomicLong flushCheckTimer;
- private int flushCheckInterval;
- private ScheduledExecutorService scheduler;
+ private final ListeningScheduledExecutorService scheduler;
+ protected final Map stormConf;
@SuppressWarnings("rawtypes")
- Client(Map storm_conf, ChannelFactory factory,
- ScheduledExecutorService scheduler, String host, int port) {
- this.storm_conf = storm_conf;
- this.factory = factory;
- this.scheduler = scheduler;
- channelRef = new AtomicReference<Channel>(null);
+ Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService
scheduler, String host, int port) {
closing = false;
- pendings = new AtomicLong(0);
- flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
- totalReconnects = new AtomicInteger(0);
- messagesSent = new AtomicInteger(0);
- messagesLostReconnect = new AtomicInteger(0);
-
- // Configure
- buffer_size =
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
- max_retries =
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
- base_sleep_ms =
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
- max_sleep_ms =
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
- retryPolicy = new
StormBoundedExponentialBackoffRetry(base_sleep_ms, max_sleep_ms, max_retries);
-
- this.messageBatchSize =
Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
-
- flushCheckInterval =
Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
// default 10 ms
-
- LOG.info("New Netty Client, connect to " + host + ", " + port
- + ", config: " + ", buffer_size: " + buffer_size);
-
- bootstrap = new ClientBootstrap(factory);
+ this.stormConf = stormConf;
+ this.scheduler = MoreExecutors.listeningDecorator(scheduler);
+ int bufferSize =
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ LOG.info("creating Netty Client, connecting to {}:{}, bufferSize:
{}", host, port, bufferSize);
+ messageBatchSize =
Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+ flushCheckIntervalMs =
Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
+
+ maxReconnectionAttempts =
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
+ int minWaitMs =
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+ int maxWaitMs =
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+ retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs,
maxWaitMs, maxReconnectionAttempts);
+
+ // Initiate connection to remote destination
+ bootstrap = createClientBootstrap(factory, bufferSize);
+ dstAddress = new InetSocketAddress(host, port);
+ dstAddressPrefixedName = prefixedName(dstAddress);
+ connect(NO_DELAY_MS);
+
+ // Launch background flushing thread
+ pauseBackgroundFlushing();
+ long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs
* maxReconnectionAttempts);
+ scheduler.scheduleWithFixedDelay(createBackgroundFlusher(),
initialDelayMs, flushCheckIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private ClientBootstrap createClientBootstrap(ChannelFactory factory,
int bufferSize) {
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("sendBufferSize", buffer_size);
+ bootstrap.setOption("sendBufferSize", bufferSize);
bootstrap.setOption("keepAlive", true);
-
- // Set up the pipeline factory.
bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+ return bootstrap;
+ }
- // Start the connection attempt.
- remote_addr = new InetSocketAddress(host, port);
-
- // setup the connection asyncly now
- scheduler.execute(new Runnable() {
- @Override
- public void run() {
- connect();
- }
- });
-
- Runnable flusher = new Runnable() {
+ private String prefixedName(InetSocketAddress dstAddress) {
+ if (null != dstAddress) {
+ return PREFIX + dstAddress.toString();
+ }
+ return "";
+ }
+
+ private Runnable createBackgroundFlusher() {
+ return new Runnable() {
@Override
public void run() {
-
- if(!closing) {
- long flushCheckTime = flushCheckTimer.get();
- long now = System.currentTimeMillis();
- if (now > flushCheckTime) {
- Channel channel = channelRef.get();
- if (null != channel && channel.isWritable()) {
- flush(channel);
- }
- }
+ if(!closing && backgroundFlushingEnabled.get() &&
nowMillis() > nextBackgroundFlushTimeMs.get()) {
+ LOG.debug("flushing {} pending messages to {} in
background", messageBatch.size(),
+ dstAddressPrefixedName);
+ flushPendingMessages();
}
-
}
};
-
- long initialDelay = Math.min(30L * 1000, max_sleep_ms *
max_retries); //max wait for 30s
- scheduler.scheduleWithFixedDelay(flusher, initialDelay,
flushCheckInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private void pauseBackgroundFlushing() {
+ backgroundFlushingEnabled.set(false);
+ }
+
+ private void resumeBackgroundFlushing() {
+ backgroundFlushingEnabled.set(true);
+ }
+
+ private synchronized void flushPendingMessages() {
+ Channel channel = channelRef.get();
+ if (containsMessages(messageBatch)) {
+ if (connectionEstablished(channel)) {
+ if (channel.isWritable()) {
+ pauseBackgroundFlushing();
+ MessageBatch toBeFlushed = messageBatch;
+ flushMessages(channel, toBeFlushed);
+ messageBatch = null;
+ }
+ else if (closing) {
+ // Ensure background flushing is enabled so that we
definitely have a chance to re-try the flush
+ // operation in case the client is being gracefully
closed (where we have a brief time window where
+ // the client will wait for pending messages to be
sent).
+ resumeBackgroundFlushing();
+ }
+ }
+ else {
+ closeChannelAndReconnect(channel);
+ }
+ }
+ }
+
+ private long nowMillis() {
+ return System.currentTimeMillis();
}
/**
* We will retry connection with exponential back-off policy
*/
- private synchronized void connect() {
+ private synchronized void connect(long delayMs) {
try {
+ if (closing) {
+ return;
+ }
- Channel channel = channelRef.get();
- if (channel != null && channel.isConnected()) {
+ if (connectionEstablished(channelRef.get())) {
return;
}
- int tried = 0;
- //setting channel to null to make sure we throw an exception
when reconnection fails
- channel = null;
- while (tried <= max_retries) {
-
- LOG.info("Reconnect started for {}... [{}]", name(),
tried);
- LOG.debug("connection started...");
-
- totalReconnects.getAndIncrement();
- ChannelFuture future = bootstrap.connect(remote_addr);
- future.awaitUninterruptibly();
- Channel current = future.getChannel();
- if (!future.isSuccess()) {
- if (null != current) {
- current.close();
+ connectionAttempts.getAndIncrement();
+ if (reconnectingAllowed()) {
+ totalConnectionAttempts.getAndIncrement();
+ LOG.info("connection attempt {} to {} scheduled to run in
{} ms", connectionAttempts.get(),
+ dstAddressPrefixedName, delayMs);
+ ListenableFuture<Channel> channelFuture =
scheduler.schedule(
+ new Connector(dstAddress, connectionAttempts.get()),
delayMs, TimeUnit.MILLISECONDS);
+ Futures.addCallback(channelFuture, new
FutureCallback<Channel>() {
+ @Override public void onSuccess(Channel result) {
+ if (connectionEstablished(result)) {
+ setChannel(result);
+ LOG.info("connection established to {}",
dstAddressPrefixedName);
+ connectionAttempts.set(0);
+ }
+ else {
+ reconnectAgain(new RuntimeException("Returned
channel was actually not established"));
+ }
+ }
+
+ @Override public void onFailure(Throwable t) {
+ reconnectAgain(t);
}
- } else {
- channel = current;
- break;
- }
- Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0));
- tried++;
+
+ private void reconnectAgain(Throwable t) {
+ String baseMsg = String.format("connection attempt
%s to %s failed", connectionAttempts,
+ dstAddressPrefixedName);
+ String failureMsg = (t == null)? baseMsg : baseMsg
+ ": " + t.toString();
+ LOG.error(failureMsg);
+ long nextDelayMs =
retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
+ connect(nextDelayMs);
+ }
+ });
}
- if (null != channel) {
- LOG.info("connection established to a remote host " +
name() + ", " + channel.toString());
- channelRef.set(channel);
- } else {
+ else {
close();
- throw new RuntimeException("Remote address is not
reachable. We will close this client " + name());
+ throw new RuntimeException("Giving up to connect to " +
dstAddressPrefixedName + " after " +
+ connectionAttempts + " failed attempts");
}
- } catch (InterruptedException e) {
- throw new RuntimeException("connection failed " + name(), e);
}
+ catch (Exception e) {
+ throw new RuntimeException("Failed to connect to " +
dstAddressPrefixedName, e);
+ }
+ }
+
+ private void setChannel(Channel channel) {
+ channelRef.set(channel);
+ }
+
+ private boolean reconnectingAllowed() {
+ return !closing && connectionAttempts.get() <=
(maxReconnectionAttempts + 1);
+ }
+
+ private boolean connectionEstablished(Channel channel) {
+ // Because we are using TCP (which is a connection-oriented
transport unlike UDP), a connection is only fully
+ // established iff the channel is connected. That is, a TCP-based
channel must be in the CONNECTED state before
+ // anything can be read or written to the channel.
+ //
+ // See:
+ // -
http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
+ // -
http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
+ return channel != null && channel.isConnected();
}
/**
- * Enqueue task messages to be sent to server
+ * Note: Storm will check via this method whether a worker can be
activated safely during the initial startup of a
+ * topology. The worker will only be activated once all of the its
connections are ready.
*/
- synchronized public void send(Iterator<TaskMessage> msgs) {
+ @Override
+ public Status status() {
+ if (closing) {
+ return Status.Closed;
+ }
+ else if (!connectionEstablished(channelRef.get())) {
+ return Status.Connecting;
+ }
+ else {
+ return Status.Ready;
+ }
+ }
+
+ /**
+ * Receiving messages is not supported by a client.
+ *
+ * @throws java.lang.UnsupportedOperationException whenever this
method is being called.
+ */
+ @Override
+ public Iterator<TaskMessage> recv(int flags, int clientId) {
+ throw new UnsupportedOperationException("Client connection should
not receive any messages");
+ }
- // throw exception if the client is being closed
+ @Override
+ public void send(int taskId, byte[] payload) {
+ TaskMessage msg = new TaskMessage(taskId, payload);
+ List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+ wrapper.add(msg);
+ send(wrapper.iterator());
+ }
+
+ /**
+ * Enqueue task messages to be sent to the remote destination (cf.
`host` and `port`).
+ */
+ @Override
+ public synchronized void send(Iterator<TaskMessage> msgs) {
if (closing) {
- throw new RuntimeException("Client is being closed, and does
not take requests any more");
+ int numMessages = iteratorSize(msgs);
+ LOG.warn("discarding {} messages because the Netty client to
{} is being closed", numMessages,
+ dstAddressPrefixedName);
+ return;
}
-
- if (null == msgs || !msgs.hasNext()) {
+
+ if (!hasMessages(msgs)) {
return;
}
Channel channel = channelRef.get();
- if (null == channel) {
- connect();
- channel = channelRef.get();
+ if (!connectionEstablished(channel)) {
+ // Closing the channel and reconnecting should be done before
handling the messages.
+ closeChannelAndReconnect(channel);
+ handleMessagesWhenConnectionIsUnavailable(msgs);
+ return;
}
+ // Collect messages into batches (to optimize network throughput),
then flush them.
while (msgs.hasNext()) {
- if (!channel.isConnected()) {
- connect();
- channel = channelRef.get();
- }
TaskMessage message = msgs.next();
- if (null == messageBatch) {
+ if (messageBatch == null) {
messageBatch = new MessageBatch(messageBatchSize);
}
messageBatch.add(message);
if (messageBatch.isFull()) {
MessageBatch toBeFlushed = messageBatch;
- flushRequest(channel, toBeFlushed);
+ flushMessages(channel, toBeFlushed);
messageBatch = null;
}
}
- if (null != messageBatch && !messageBatch.isEmpty()) {
- if (channel.isWritable()) {
- flushCheckTimer.set(Long.MAX_VALUE);
-
- // Flush as fast as we can to reduce the latency
+ // Handle any remaining messages in case the "last" batch was not
full.
+ if (containsMessages(messageBatch)) {
+ if (connectionEstablished(channel) && channel.isWritable()) {
+ // We can write to the channel, so we flush the remaining
messages immediately to minimize latency.
+ pauseBackgroundFlushing();
MessageBatch toBeFlushed = messageBatch;
messageBatch = null;
- flushRequest(channel, toBeFlushed);
-
- } else {
- // when channel is NOT writable, it means the internal
netty buffer is full.
- // In this case, we can try to buffer up more incoming
messages.
- flushCheckTimer.set(System.currentTimeMillis() +
flushCheckInterval);
+ flushMessages(channel, toBeFlushed);
+ }
+ else {
+ // We cannot write to the channel, which means Netty's
internal write buffer is full.
+ // In this case, we buffer the remaining messages and wait
for the next messages to arrive.
+ //
+ // Background:
+ // Netty 3.x maintains an internal write buffer with a
high water mark for each channel (default: 64K).
+ // This represents the amount of data waiting to be
flushed to operating system buffers. If the
+ // outstanding data exceeds this value then the channel is
set to non-writable. When this happens, a
+ // INTEREST_CHANGED channel event is triggered. Netty
sets the channel to writable again once the data
+ // has been flushed to the system buffers.
+ //
+ // See http://stackoverflow.com/questions/14049260
+ resumeBackgroundFlushing();
+ nextBackgroundFlushTimeMs.set(nowMillis() +
flushCheckIntervalMs);
}
}
}
- public String name() {
- if (null != remote_addr) {
- return PREFIX + remote_addr.toString();
- }
- return "";
+ private boolean hasMessages(Iterator<TaskMessage> msgs) {
+ return msgs != null && msgs.hasNext();
}
- private synchronized void flush(Channel channel) {
- if (!closing) {
- if (null != messageBatch && !messageBatch.isEmpty()) {
- MessageBatch toBeFlushed = messageBatch;
- flushCheckTimer.set(Long.MAX_VALUE);
- flushRequest(channel, toBeFlushed);
- messageBatch = null;
+ /**
+ * We will drop pending messages and let at-least-once message replay
kick in.
+ *
+ * Another option would be to buffer the messages in memory. But this
option has the risk of causing OOM errors,
+ * especially for topologies that disable message acking because we
don't know whether the connection recovery will
+ * succeed or not, and how long the recovery will take.
+ */
+ private void
handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
+ LOG.error("connection to {} is unavailable",
dstAddressPrefixedName);
+ dropPendingMessages(msgs);
+ }
+
+ private void dropPendingMessages(Iterator<TaskMessage> msgs) {
+ // We consume the iterator by traversing and thus "emptying" it.
+ int msgCount = iteratorSize(msgs);
--- End diff --
Fixed. Good catch, Bobby!
> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
> Key: STORM-329
> URL: https://issues.apache.org/jira/browse/STORM-329
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.9.2-incubating
> Reporter: Sean Zhong
> Priority: Minor
> Labels: Netty
> Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986]
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are
> blocking. My biggest concern around the blocking is in the case of a worker
> crashing. If a single worker crashes this can block the entire topology from
> executing until that worker comes back up. In some cases I can see that being
> something that you would want. In other cases I can see speed being the
> primary concern and some users would like to get partial data fast, rather
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max
> limit to the buffering that is allowed, before we block, or throw data away
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the
> messages, and use the built-in storm failover mechanism?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)