lianetm commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1333166590
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -237,4 +253,64 @@ public void
updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assig
}
maybeTransitionToStable();
}
+
+ /**
+ * Performs the reconciliation between the current topic assignment and
the {@link Assignment} provided in the
+ * heartbeat response. Per the KIP-848 protocol, we perform the following:
+ *
+ * <ol>
+ * <li>Revoke partitions, if any</li>
+ * <li>Heartbeat to acknowledge revoked partitions</li>
+ * <li>Assign partitions, if any</li>
+ * <li>Heartbeat to acknowledge assigned partitions</li>
+ * </ol>
+ *
+ * TODO: What are the the state changes here?
+ * TODO: Is SubscriptionState sufficient to build the next heartbeat
request?
+ * TODO: Where do we need to call
ConsumerRebalanceListener.onPartitionsLost()?
+ */
+ void reconcile() {
+ transitionTo(MemberState.RECONCILING);
+
+ Timer remainingAssignmentTime = time.timer(10000);
+
+ // First, we need to determine if any partitions need to be revoked.
+ {
+ ReconciliationResult result = reconciler.revoke(targetAssignment,
remainingAssignmentTime);
Review Comment:
I was expecting it was going to be the heartbeat manager the one calling
this kind of logic . The state machine shouldn't be involved in anything other
than keeping member info and state, and it already transitions to RECONCILE the
moment the heartbeat req manager calls the `updateState` with a successful
heartbeat response.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]