Rui Abreu created STORM-4060:
--------------------------------
Summary: Netty client will wait up to 10 minutes to send messages
to unreachable worker
Key: STORM-4060
URL: https://issues.apache.org/jira/browse/STORM-4060
Project: Apache Storm
Issue Type: Bug
Components: storm-client, storm-core
Affects Versions: 2.6.2, 1.2.4, 1.1.3
Reporter: Rui Abreu
Assignee: Rui Abreu
Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this means
buffered messages will remain in the worker 10 minutes before being given up
and reprocessed.
This leads to increased latencies in the topology.
{code:java}
public class Client extends ConnectionWithStatus implements ISaslClient {
private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
{code}
{code:java}
@Override
public void close() {
if (!closing) {
LOG.info("closing Netty Client {}", dstAddressPrefixedName);
// Set closing to true to prevent any further reconnection attempts.
closing = true;
waitForPendingMessagesToBeSent();
closeChannel();
// stop tracking metrics for this client
if (this.metricRegistry != null) {
this.metricRegistry.deregister(this.metrics);
}
}
} {code}
{code:java}
private void waitForPendingMessagesToBeSent() {
LOG.info("waiting up to {} ms to send {} pending messages to {}",
PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(),
dstAddressPrefixedName);
long totalPendingMsgs = pendingMessages.get();
long startMs = System.currentTimeMillis();
while (pendingMessages.get() != 0) {
try {
long deltaMs = System.currentTimeMillis() - startMs;
if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
LOG.error("failed to send all pending messages to {} within
timeout, {} of {} messages were not "
+ "sent", dstAddressPrefixedName, pendingMessages.get(),
totalPendingMsgs);
break;
}
Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
} catch (InterruptedException e) {
break;
}
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)