This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9d634629f29 KAFKA-16899 MembershipManagerImpl: rebalanceTimeoutMs 
variable name changed to commitTimeoutDuringReconciliation (#16334)
9d634629f29 is described below

commit 9d634629f29cd402a723d7e7bece18c8099065a4
Author: Linu Shibu <[email protected]>
AuthorDate: Wed Jul 31 22:21:46 2024 +0530

    KAFKA-16899 MembershipManagerImpl: rebalanceTimeoutMs variable name changed 
to commitTimeoutDuringReconciliation (#16334)
    
    As mentioned in the ticket, the config property name "rebalanceTimeoutMs" 
is misleading since the property only handles the client's commit portion of 
the process.It is used in MembershipManagerImpl as a means to limit the 
client's efforts in the case where it is repeatedly trying to commit but 
failing. Considering the same, the property name has been updated to 
"commitTimeoutDuringReconciliation" (suggested in ticket) in 
GroupRebalanceConfig and in all other classes where the propert [...]
    
    Reviewers: Kirk True <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../consumer/internals/MembershipManagerImpl.java    | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index a0e3a06d008..70b3b492dcd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -140,11 +140,11 @@ public class MembershipManagerImpl implements 
MembershipManager {
     private final Optional<String> groupInstanceId;
 
     /**
-     * Rebalance timeout. To be used as time limit for the commit request 
issued
+     * Reconciliation commit timeout. To be used as time limit for the commit 
request issued
      * when a new assignment is received, that is retried until it succeeds, 
fails with a
      * non-retriable error, it the time limit expires.
      */
-    private final int rebalanceTimeoutMs;
+    private final int commitTimeoutDuringReconciliation;
 
     /**
      * Member ID assigned by the server to the member, received in a heartbeat 
response when
@@ -288,7 +288,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
 
     public MembershipManagerImpl(String groupId,
                                  Optional<String> groupInstanceId,
-                                 int rebalanceTimeoutMs,
+                                 int commitTimeoutDuringReconciliation,
                                  Optional<String> serverAssignor,
                                  SubscriptionState subscriptions,
                                  CommitRequestManager commitRequestManager,
@@ -300,7 +300,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
                                  Metrics metrics) {
         this(groupId,
             groupInstanceId,
-            rebalanceTimeoutMs,
+            commitTimeoutDuringReconciliation,
             serverAssignor,
             subscriptions,
             commitRequestManager,
@@ -315,7 +315,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
     // Visible for testing
     MembershipManagerImpl(String groupId,
                           Optional<String> groupInstanceId,
-                          int rebalanceTimeoutMs,
+                          int commitTimeoutDuringReconciliation,
                           Optional<String> serverAssignor,
                           SubscriptionState subscriptions,
                           CommitRequestManager commitRequestManager,
@@ -338,7 +338,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
         this.log = logContext.logger(MembershipManagerImpl.class);
         this.stateUpdatesListeners = new ArrayList<>();
         this.clientTelemetryReporter = clientTelemetryReporter;
-        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.commitTimeoutDuringReconciliation = 
commitTimeoutDuringReconciliation;
         this.backgroundEventHandler = backgroundEventHandler;
         this.time = time;
         this.metricsManager = metricsManager;
@@ -1015,9 +1015,9 @@ public class MembershipManagerImpl implements 
MembershipManager {
         // Issue a commit request that will be retried until it succeeds, 
fails with a
         // non-retriable error, or the time limit expires. Retry on stale 
member epoch error, in a
         // best effort to commit the offsets in the case where the epoch might 
have changed while
-        // the current reconciliation is in process. Note this is using the 
rebalance timeout as
-        // it is the limit enforced by the broker to complete the 
reconciliation process.
-        commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));
+        // the current reconciliation is in process. Note this is using the 
reconciliation commit timeout
+        // as it is the limit enforced by the broker to complete the 
reconciliation process.
+        commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(commitTimeoutDuringReconciliation));
 
         // Execute commit -> onPartitionsRevoked -> onPartitionsAssigned.
         commitResult.whenComplete((__, commitReqError) -> {
@@ -1083,7 +1083,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
             if (error != null) {
                 // Leaving member in RECONCILING state after callbacks fail. 
The member
                 // won't send the ack, and the expectation is that the broker 
will kick the
-                // member out of the group after the rebalance timeout 
expires, leading to a
+                // member out of the group after the reconciliation commit 
timeout expires, leading to a
                 // RECONCILING -> FENCED transition.
                 log.error("Reconciliation failed.", error);
                 markReconciliationCompleted();

Reply via email to