[ 
https://issues.apache.org/jira/browse/STORM-4060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Abreu closed STORM-4060.
----------------------------

> Netty client will wait up to 10 minutes to send messages to unreachable 
> worker on close()
> -----------------------------------------------------------------------------------------
>
>                 Key: STORM-4060
>                 URL: https://issues.apache.org/jira/browse/STORM-4060
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-client, storm-core
>    Affects Versions: 1.1.3, 1.2.4, 2.6.2
>            Reporter: Rui Abreu
>            Assignee: Rui Abreu
>            Priority: Major
>             Fix For: 2.6.3
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
>  
> Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this 
> means buffered messages will remain in the worker 10 minutes before being 
> given up and reprocessed.
> This leads to increased latencies in the topology.
> It is proposed that PENDING_MESSAGES_FLUSH_TIMEOUT_MS is exposed as property 
> to be configured by the user.
> Code that leads to this situation:
> {code:java}
> public class Client extends ConnectionWithStatus implements ISaslClient {
>     private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L; 
> {code}
> {code:java}
> @Override
> public void close() {
>     if (!closing) {
>         LOG.info("closing Netty Client {}", dstAddressPrefixedName);
>         // Set closing to true to prevent any further reconnection attempts.
>         closing = true;
>         waitForPendingMessagesToBeSent();
>         closeChannel();
>         // stop tracking metrics for this client
>         if (this.metricRegistry != null) {
>             this.metricRegistry.deregister(this.metrics);
>         }
>     }
> } {code}
> {code:java}
> private void waitForPendingMessagesToBeSent() {
>     LOG.info("waiting up to {} ms to send {} pending messages to {}",
>              PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), 
> dstAddressPrefixedName);
>     long totalPendingMsgs = pendingMessages.get();
>     long startMs = System.currentTimeMillis();
>     while (pendingMessages.get() != 0) {
>         try {
>             long deltaMs = System.currentTimeMillis() - startMs;
>             if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
>                 LOG.error("failed to send all pending messages to {} within 
> timeout, {} of {} messages were not "
>                     + "sent", dstAddressPrefixedName, pendingMessages.get(), 
> totalPendingMsgs);
>                 break;
>             }
>             Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
>         } catch (InterruptedException e) {
>             break;
>         }
>     }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to