Rui Abreu created STORM-4060:
--------------------------------

             Summary: Netty client will wait up to 10 minutes to send messages 
to unreachable worker
                 Key: STORM-4060
                 URL: https://issues.apache.org/jira/browse/STORM-4060
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-client, storm-core
    Affects Versions: 2.6.2, 1.2.4, 1.1.3
            Reporter: Rui Abreu
            Assignee: Rui Abreu


 

Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this means 
buffered messages will remain in the worker 10 minutes before being given up 
and reprocessed.
This leads to increased latencies in the topology.
{code:java}
public class Client extends ConnectionWithStatus implements ISaslClient {
    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L; 
{code}
{code:java}
@Override
public void close() {
    if (!closing) {
        LOG.info("closing Netty Client {}", dstAddressPrefixedName);
        // Set closing to true to prevent any further reconnection attempts.
        closing = true;
        waitForPendingMessagesToBeSent();
        closeChannel();

        // stop tracking metrics for this client
        if (this.metricRegistry != null) {
            this.metricRegistry.deregister(this.metrics);
        }
    }
} {code}
{code:java}
private void waitForPendingMessagesToBeSent() {
    LOG.info("waiting up to {} ms to send {} pending messages to {}",
             PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), 
dstAddressPrefixedName);
    long totalPendingMsgs = pendingMessages.get();
    long startMs = System.currentTimeMillis();
    while (pendingMessages.get() != 0) {
        try {
            long deltaMs = System.currentTimeMillis() - startMs;
            if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
                LOG.error("failed to send all pending messages to {} within 
timeout, {} of {} messages were not "
                    + "sent", dstAddressPrefixedName, pendingMessages.get(), 
totalPendingMsgs);
                break;
            }
            Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
        } catch (InterruptedException e) {
            break;
        }
    }

} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to