philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1332517141
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -237,4 +253,64 @@ public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig } maybeTransitionToStable(); } + + /** + * Performs the reconciliation between the current topic assignment and the {@link Assignment} provided in the + * heartbeat response. Per the KIP-848 protocol, we perform the following: + * + * <ol> + * <li>Revoke partitions, if any</li> + * <li>Heartbeat to acknowledge revoked partitions</li> + * <li>Assign partitions, if any</li> + * <li>Heartbeat to acknowledge assigned partitions</li> + * </ol> + * + * TODO: What are the the state changes here? + * TODO: Is SubscriptionState sufficient to build the next heartbeat request? + * TODO: Where do we need to call ConsumerRebalanceListener.onPartitionsLost()? + */ + void reconcile() { + transitionTo(MemberState.RECONCILING); + + Timer remainingAssignmentTime = time.timer(10000); Review Comment: Maybe add a TODO - You should be getting the rebalance timeout from the max poll interval. ``` The rebalance timeout is provided by the member when it joins the group. It is basically the max poll interval configured on the client side ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org