[
https://issues.apache.org/jira/browse/STORM-4060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Abreu closed STORM-4060.
----------------------------
> Netty client will wait up to 10 minutes to send messages to unreachable
> worker on close()
> -----------------------------------------------------------------------------------------
>
> Key: STORM-4060
> URL: https://issues.apache.org/jira/browse/STORM-4060
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-client, storm-core
> Affects Versions: 1.1.3, 1.2.4, 2.6.2
> Reporter: Rui Abreu
> Assignee: Rui Abreu
> Priority: Major
> Fix For: 2.6.3
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
>
> Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this
> means buffered messages will remain in the worker 10 minutes before being
> given up and reprocessed.
> This leads to increased latencies in the topology.
> It is proposed that PENDING_MESSAGES_FLUSH_TIMEOUT_MS is exposed as property
> to be configured by the user.
> Code that leads to this situation:
> {code:java}
> public class Client extends ConnectionWithStatus implements ISaslClient {
> private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
> {code}
> {code:java}
> @Override
> public void close() {
> if (!closing) {
> LOG.info("closing Netty Client {}", dstAddressPrefixedName);
> // Set closing to true to prevent any further reconnection attempts.
> closing = true;
> waitForPendingMessagesToBeSent();
> closeChannel();
> // stop tracking metrics for this client
> if (this.metricRegistry != null) {
> this.metricRegistry.deregister(this.metrics);
> }
> }
> } {code}
> {code:java}
> private void waitForPendingMessagesToBeSent() {
> LOG.info("waiting up to {} ms to send {} pending messages to {}",
> PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(),
> dstAddressPrefixedName);
> long totalPendingMsgs = pendingMessages.get();
> long startMs = System.currentTimeMillis();
> while (pendingMessages.get() != 0) {
> try {
> long deltaMs = System.currentTimeMillis() - startMs;
> if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
> LOG.error("failed to send all pending messages to {} within
> timeout, {} of {} messages were not "
> + "sent", dstAddressPrefixedName, pendingMessages.get(),
> totalPendingMsgs);
> break;
> }
> Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
> } catch (InterruptedException e) {
> break;
> }
> }
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)