lianetm commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1333628852
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -237,4 +253,64 @@ public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig } maybeTransitionToStable(); } + + /** + * Performs the reconciliation between the current topic assignment and the {@link Assignment} provided in the + * heartbeat response. Per the KIP-848 protocol, we perform the following: + * + * <ol> + * <li>Revoke partitions, if any</li> + * <li>Heartbeat to acknowledge revoked partitions</li> + * <li>Assign partitions, if any</li> + * <li>Heartbeat to acknowledge assigned partitions</li> + * </ol> + * + * TODO: What are the the state changes here? + * TODO: Is SubscriptionState sufficient to build the next heartbeat request? + * TODO: Where do we need to call ConsumerRebalanceListener.onPartitionsLost()? + */ + void reconcile() { + transitionTo(MemberState.RECONCILING); + + Timer remainingAssignmentTime = time.timer(10000); + + // First, we need to determine if any partitions need to be revoked. + { + ReconciliationResult result = reconciler.revoke(targetAssignment, remainingAssignmentTime); + remainingAssignmentTime.update(); + + if (result == ReconciliationResult.COMPLETED) { + // If we've revoked one or more partitions, we need to send an acknowledgement request ASAP to + // let the coordinator know that they've been removed locally. + return; + } else if (result == ReconciliationResult.EXPIRED) { + // TODO: what do we do here? + return; + } else if (result == ReconciliationResult.IN_PROGRESS) { + // At this point, we've started the revocation, but it isn't complete. We've already checked that + // ReconciliationResult is not expired, so this means our revocation is in progress and there's nothing + // to do here but wait until it's finished or expires... right? + return; + } + } + + // Next, we need to determine the partitions to be added, if any. + { + ReconciliationResult result = reconciler.assign(targetAssignment, remainingAssignmentTime); + remainingAssignmentTime.update(); + + if (result == ReconciliationResult.COMPLETED) { + // If we've assigned one or more partitions, we need to send an acknowledgement request ASAP to Review Comment: This should be interaction between the AssignmentReconciler and the HB manager I would expect. The state machine should only care about updating the state to RECONCILING when the assignment process starts, and to STABLE/FAILED when it completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org