[
https://issues.apache.org/jira/browse/STORM-4060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Abreu updated STORM-4060:
-----------------------------
Description:
Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this means
buffered messages will remain in the worker 10 minutes before being given up
and reprocessed.
This leads to increased latencies in the topology.
It is proposed that PENDING_MESSAGES_FLUSH_TIMEOUT_MS is exposed as property to
be configured by the user.
Code that leads to this situation:
{code:java}
public class Client extends ConnectionWithStatus implements ISaslClient {
private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
{code}
{code:java}
@Override
public void close() {
if (!closing) {
LOG.info("closing Netty Client {}", dstAddressPrefixedName);
// Set closing to true to prevent any further reconnection attempts.
closing = true;
waitForPendingMessagesToBeSent();
closeChannel();
// stop tracking metrics for this client
if (this.metricRegistry != null) {
this.metricRegistry.deregister(this.metrics);
}
}
} {code}
{code:java}
private void waitForPendingMessagesToBeSent() {
LOG.info("waiting up to {} ms to send {} pending messages to {}",
PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(),
dstAddressPrefixedName);
long totalPendingMsgs = pendingMessages.get();
long startMs = System.currentTimeMillis();
while (pendingMessages.get() != 0) {
try {
long deltaMs = System.currentTimeMillis() - startMs;
if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
LOG.error("failed to send all pending messages to {} within
timeout, {} of {} messages were not "
+ "sent", dstAddressPrefixedName, pendingMessages.get(),
totalPendingMsgs);
break;
}
Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
} catch (InterruptedException e) {
break;
}
}
} {code}
was:
Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this means
buffered messages will remain in the worker 10 minutes before being given up
and reprocessed.
This leads to increased latencies in the topology.
It is proposed that PENDING_MESSAGES_FLUSH_TIMEOUT_MS is exposed as property to
be configured by the user.
{code:java}
public class Client extends ConnectionWithStatus implements ISaslClient {
private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
{code}
{code:java}
@Override
public void close() {
if (!closing) {
LOG.info("closing Netty Client {}", dstAddressPrefixedName);
// Set closing to true to prevent any further reconnection attempts.
closing = true;
waitForPendingMessagesToBeSent();
closeChannel();
// stop tracking metrics for this client
if (this.metricRegistry != null) {
this.metricRegistry.deregister(this.metrics);
}
}
} {code}
{code:java}
private void waitForPendingMessagesToBeSent() {
LOG.info("waiting up to {} ms to send {} pending messages to {}",
PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(),
dstAddressPrefixedName);
long totalPendingMsgs = pendingMessages.get();
long startMs = System.currentTimeMillis();
while (pendingMessages.get() != 0) {
try {
long deltaMs = System.currentTimeMillis() - startMs;
if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
LOG.error("failed to send all pending messages to {} within
timeout, {} of {} messages were not "
+ "sent", dstAddressPrefixedName, pendingMessages.get(),
totalPendingMsgs);
break;
}
Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
} catch (InterruptedException e) {
break;
}
}
} {code}
> Netty client will wait up to 10 minutes to send messages to unreachable worker
> ------------------------------------------------------------------------------
>
> Key: STORM-4060
> URL: https://issues.apache.org/jira/browse/STORM-4060
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-client, storm-core
> Affects Versions: 1.1.3, 1.2.4, 2.6.2
> Reporter: Rui Abreu
> Assignee: Rui Abreu
> Priority: Major
>
>
> Since PENDING_MESSAGES_FLUSH_TIMEOUT_MS is hardcoded to 10 minutes, this
> means buffered messages will remain in the worker 10 minutes before being
> given up and reprocessed.
> This leads to increased latencies in the topology.
> It is proposed that PENDING_MESSAGES_FLUSH_TIMEOUT_MS is exposed as property
> to be configured by the user.
> Code that leads to this situation:
> {code:java}
> public class Client extends ConnectionWithStatus implements ISaslClient {
> private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
> {code}
> {code:java}
> @Override
> public void close() {
> if (!closing) {
> LOG.info("closing Netty Client {}", dstAddressPrefixedName);
> // Set closing to true to prevent any further reconnection attempts.
> closing = true;
> waitForPendingMessagesToBeSent();
> closeChannel();
> // stop tracking metrics for this client
> if (this.metricRegistry != null) {
> this.metricRegistry.deregister(this.metrics);
> }
> }
> } {code}
> {code:java}
> private void waitForPendingMessagesToBeSent() {
> LOG.info("waiting up to {} ms to send {} pending messages to {}",
> PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(),
> dstAddressPrefixedName);
> long totalPendingMsgs = pendingMessages.get();
> long startMs = System.currentTimeMillis();
> while (pendingMessages.get() != 0) {
> try {
> long deltaMs = System.currentTimeMillis() - startMs;
> if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
> LOG.error("failed to send all pending messages to {} within
> timeout, {} of {} messages were not "
> + "sent", dstAddressPrefixedName, pendingMessages.get(),
> totalPendingMsgs);
> break;
> }
> Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
> } catch (InterruptedException e) {
> break;
> }
> }
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)