lianetm commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1333583458


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -237,4 +253,64 @@ public void 
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
         }
         maybeTransitionToStable();
     }
+
+    /**
+     * Performs the reconciliation between the current topic assignment and 
the {@link Assignment} provided in the
+     * heartbeat response. Per the KIP-848 protocol, we perform the following:
+     *
+     * <ol>
+     *     <li>Revoke partitions, if any</li>
+     *     <li>Heartbeat to acknowledge revoked partitions</li>
+     *     <li>Assign partitions, if any</li>
+     *     <li>Heartbeat to acknowledge assigned partitions</li>
+     * </ol>
+     *
+     * TODO: What are the the state changes here?
+     * TODO: Is SubscriptionState sufficient to build the next heartbeat 
request?
+     * TODO: Where do we need to call 
ConsumerRebalanceListener.onPartitionsLost()?
+     */
+    void reconcile() {
+        transitionTo(MemberState.RECONCILING);
+
+        Timer remainingAssignmentTime = time.timer(10000);
+
+        // First, we need to determine if any partitions need to be revoked.
+        {
+            ReconciliationResult result = reconciler.revoke(targetAssignment, 
remainingAssignmentTime);
+            remainingAssignmentTime.update();
+
+            if (result == ReconciliationResult.COMPLETED) {
+                // If we've revoked one or more partitions, we need to send an 
acknowledgement request ASAP to
+                // let the coordinator know that they've been removed locally.
+                return;
+            } else if (result == ReconciliationResult.EXPIRED) {
+                // TODO: what do we do here?

Review Comment:
   I may be missing something, but I don't quite get why we need to time this 
on the client side. It is the server the one that handles the 
rebalanceTimeouts. 
   
   The server will start the timer when sending an assignment/revocation to a 
member, and if it does not receive the ack in time, it will fence the member. 
From the client point of view, when it completes processing the 
assignment/revocation, it would notice it has been fenced (state is FENCED), 
and it won't send the ack to the assignment (because it is not part of the 
group anymore). It would have to invoke the onPartitionsLost and rejoin the 
group I expect. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to