This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9d634629f29 KAFKA-16899 MembershipManagerImpl: rebalanceTimeoutMs
variable name changed to commitTimeoutDuringReconciliation (#16334)
9d634629f29 is described below
commit 9d634629f29cd402a723d7e7bece18c8099065a4
Author: Linu Shibu <[email protected]>
AuthorDate: Wed Jul 31 22:21:46 2024 +0530
KAFKA-16899 MembershipManagerImpl: rebalanceTimeoutMs variable name changed
to commitTimeoutDuringReconciliation (#16334)
As mentioned in the ticket, the config property name "rebalanceTimeoutMs"
is misleading since the property only handles the client's commit portion of
the process.It is used in MembershipManagerImpl as a means to limit the
client's efforts in the case where it is repeatedly trying to commit but
failing. Considering the same, the property name has been updated to
"commitTimeoutDuringReconciliation" (suggested in ticket) in
GroupRebalanceConfig and in all other classes where the propert [...]
Reviewers: Kirk True <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../consumer/internals/MembershipManagerImpl.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index a0e3a06d008..70b3b492dcd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -140,11 +140,11 @@ public class MembershipManagerImpl implements
MembershipManager {
private final Optional<String> groupInstanceId;
/**
- * Rebalance timeout. To be used as time limit for the commit request
issued
+ * Reconciliation commit timeout. To be used as time limit for the commit
request issued
* when a new assignment is received, that is retried until it succeeds,
fails with a
* non-retriable error, it the time limit expires.
*/
- private final int rebalanceTimeoutMs;
+ private final int commitTimeoutDuringReconciliation;
/**
* Member ID assigned by the server to the member, received in a heartbeat
response when
@@ -288,7 +288,7 @@ public class MembershipManagerImpl implements
MembershipManager {
public MembershipManagerImpl(String groupId,
Optional<String> groupInstanceId,
- int rebalanceTimeoutMs,
+ int commitTimeoutDuringReconciliation,
Optional<String> serverAssignor,
SubscriptionState subscriptions,
CommitRequestManager commitRequestManager,
@@ -300,7 +300,7 @@ public class MembershipManagerImpl implements
MembershipManager {
Metrics metrics) {
this(groupId,
groupInstanceId,
- rebalanceTimeoutMs,
+ commitTimeoutDuringReconciliation,
serverAssignor,
subscriptions,
commitRequestManager,
@@ -315,7 +315,7 @@ public class MembershipManagerImpl implements
MembershipManager {
// Visible for testing
MembershipManagerImpl(String groupId,
Optional<String> groupInstanceId,
- int rebalanceTimeoutMs,
+ int commitTimeoutDuringReconciliation,
Optional<String> serverAssignor,
SubscriptionState subscriptions,
CommitRequestManager commitRequestManager,
@@ -338,7 +338,7 @@ public class MembershipManagerImpl implements
MembershipManager {
this.log = logContext.logger(MembershipManagerImpl.class);
this.stateUpdatesListeners = new ArrayList<>();
this.clientTelemetryReporter = clientTelemetryReporter;
- this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+ this.commitTimeoutDuringReconciliation =
commitTimeoutDuringReconciliation;
this.backgroundEventHandler = backgroundEventHandler;
this.time = time;
this.metricsManager = metricsManager;
@@ -1015,9 +1015,9 @@ public class MembershipManagerImpl implements
MembershipManager {
// Issue a commit request that will be retried until it succeeds,
fails with a
// non-retriable error, or the time limit expires. Retry on stale
member epoch error, in a
// best effort to commit the offsets in the case where the epoch might
have changed while
- // the current reconciliation is in process. Note this is using the
rebalance timeout as
- // it is the limit enforced by the broker to complete the
reconciliation process.
- commitResult =
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));
+ // the current reconciliation is in process. Note this is using the
reconciliation commit timeout
+ // as it is the limit enforced by the broker to complete the
reconciliation process.
+ commitResult =
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(commitTimeoutDuringReconciliation));
// Execute commit -> onPartitionsRevoked -> onPartitionsAssigned.
commitResult.whenComplete((__, commitReqError) -> {
@@ -1083,7 +1083,7 @@ public class MembershipManagerImpl implements
MembershipManager {
if (error != null) {
// Leaving member in RECONCILING state after callbacks fail.
The member
// won't send the ack, and the expectation is that the broker
will kick the
- // member out of the group after the rebalance timeout
expires, leading to a
+ // member out of the group after the reconciliation commit
timeout expires, leading to a
// RECONCILING -> FENCED transition.
log.error("Reconciliation failed.", error);
markReconciliationCompleted();