This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8b1eca1 KAFKA-13126: guard against overflow when computing
`joinGroupTimeoutMs` (#11111)
8b1eca1 is described below
commit 8b1eca1c58aef822082dd9954bc41179ecf3eb06
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Jul 23 16:22:41 2021 -0700
KAFKA-13126: guard against overflow when computing `joinGroupTimeoutMs`
(#11111)
Setting the max.poll.interval.ms to MAX_VALUE causes overflow when
computing the joinGroupTimeoutMs and results in the JoinGroup timeout being set
to the request.timeout.ms instead, which is much lower.
This can easily make consumers drop out of the group, since they must
rejoin now within 30s (by default) yet have no obligation to almost ever call
poll() given the high max.poll.interval.ms, especially when each record takes a
long time to process or the `max.poll.records` is also very large. We just need
to check for overflow and fix it to Integer.MAX_VALUE when it occurs.
Reviewers: Luke Chen <[email protected]>, John Roesler <[email protected]>
---
.../main/java/org/apache/kafka/clients/NetworkClientUtils.java | 5 ++---
.../kafka/clients/consumer/internals/AbstractCoordinator.java | 8 ++++++--
.../org/apache/kafka/common/security/kerberos/KerberosLogin.java | 4 ++--
.../main/java/org/apache/kafka/connect/tools/MockSinkTask.java | 2 +-
.../main/java/org/apache/kafka/connect/tools/MockSourceTask.java | 2 +-
.../kafka/streams/processor/internals/GlobalStreamThread.java | 2 +-
.../apache/kafka/streams/processor/internals/StateDirectory.java | 2 +-
7 files changed, 14 insertions(+), 11 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
index c952b82..4c4d635 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -60,17 +60,16 @@ public final class NetworkClientUtils {
throw new IllegalArgumentException("Timeout needs to be greater
than 0");
}
long startTime = time.milliseconds();
- long expiryTime = startTime + timeoutMs;
if (isReady(client, node, startTime) || client.ready(node, startTime))
return true;
long attemptStartTime = time.milliseconds();
- while (!client.isReady(node, attemptStartTime) && attemptStartTime <
expiryTime) {
+ while (!client.isReady(node, attemptStartTime) && attemptStartTime -
startTime < timeoutMs) {
if (client.connectionFailed(node)) {
throw new IOException("Connection to " + node + " failed.");
}
- long pollTimeout = expiryTime - attemptStartTime;
+ long pollTimeout = timeoutMs - (attemptStartTime - startTime); //
initialize in this order to avoid overflow
client.poll(pollTimeout, attemptStartTime);
if (client.authenticationException(node) != null)
throw client.authenticationException(node);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1f04a47..223c4fe 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -548,8 +548,12 @@ public abstract class AbstractCoordinator implements
Closeable {
// Note that we override the request timeout using the rebalance
timeout since that is the
// maximum time that it may block on the coordinator. We add an extra
5 seconds for small delays.
- int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),
- rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
+ int joinGroupTimeoutMs = Math.max(
+ client.defaultRequestTimeoutMs(),
+ Math.max(
+ rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE,
+ rebalanceConfig.rebalanceTimeoutMs) // guard against overflow
since rebalance timeout can be MAX_VALUE
+ );
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler(generation));
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index 3f0a46e..f2b25a5 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -164,12 +164,12 @@ public class KerberosLogin extends AbstractLogin {
// We should not allow the ticket to expire, but we should
take into consideration
// minTimeBeforeRelogin. Will not sleep less than
minTimeBeforeRelogin, unless doing so
// would cause ticket expiration.
- if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin
> expiry)) {
+ if ((nextRefresh > expiry) || (minTimeBeforeRelogin >
expiry - now)) {
// expiry is before next scheduled refresh).
log.info("[Principal={}]: Refreshing now because
expiry is before next scheduled refresh time.", principal);
nextRefresh = now;
} else {
- if (nextRefresh < (now + minTimeBeforeRelogin)) {
+ if (nextRefresh - now < minTimeBeforeRelogin) {
// next scheduled refresh is sooner than (now +
MIN_TIME_BEFORE_LOGIN).
Date until = new Date(nextRefresh);
Date newUntil = new Date(now +
minTimeBeforeRelogin);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
index 0f8e0ac..f48bd31 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
@@ -60,7 +60,7 @@ public class MockSinkTask extends SinkTask {
public void put(Collection<SinkRecord> records) {
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
long now = System.currentTimeMillis();
- if (now > startTimeMs + failureDelayMs) {
+ if (now - startTimeMs > failureDelayMs) {
log.debug("Triggering sink task failure");
throw new RuntimeException();
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
index 5053f3b..4decf03 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
@@ -58,7 +58,7 @@ public class MockSourceTask extends SourceTask {
public List<SourceRecord> poll() throws InterruptedException {
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
long now = System.currentTimeMillis();
- if (now > startTimeMs + failureDelayMs) {
+ if (now - startTimeMs > failureDelayMs) {
log.debug("Triggering source task failure");
throw new RuntimeException();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index ddad121..e55c5d7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -262,7 +262,7 @@ public class GlobalStreamThread extends Thread {
stateMaintainer.update(record);
}
final long now = time.milliseconds();
- if (now >= lastFlush + flushInterval) {
+ if (now - flushInterval >= lastFlush) {
stateMaintainer.flushState();
lastFlush = now;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 4cc6f98..90a7f80 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -459,7 +459,7 @@ public class StateDirectory {
if (lock(id)) {
final long now = time.milliseconds();
final long lastModifiedMs =
taskDir.file().lastModified();
- if (now > lastModifiedMs + cleanupDelayMs) {
+ if (now - cleanupDelayMs > lastModifiedMs) {
log.info("{} Deleting obsolete state directory {}
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
logPrefix(), dirName, id, now -
lastModifiedMs, cleanupDelayMs);
Utils.delete(taskDir.file());