kirktrue commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1362615379
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java: ########## @@ -85,122 +111,162 @@ enum ReconciliationResult { private final BlockingQueue<BackgroundEvent> backgroundEventQueue; private Optional<RebalanceCallbackEvent> inflightCallback; - public MemberAssignmentReconciler(LogContext logContext, - SubscriptionState subscriptions, - ConsumerMetadata metadata, - BlockingQueue<BackgroundEvent> backgroundEventQueue) { + AssignmentReconciler(LogContext logContext, + SubscriptionState subscriptions, + ConsumerMetadata metadata, + BlockingQueue<BackgroundEvent> backgroundEventQueue) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.metadata = metadata; this.backgroundEventQueue = backgroundEventQueue; } /** - * Perform the revocation process, if necessary, depending on the given {@link Assignment target assignment}. If the - * {@link SubscriptionState#assignedPartitions() current set of assigned partitions} includes entries that are - * <em>not</em> in the target assignment, these will be considered for revocation. If there is already a - * reconciliation in progress (revocation or assignment), this method will return without performing any - * revocation. + * Perform the reconciliation process, as necessary to meet the given {@link Assignment target assignment}. Note + * that the reconciliation is a multi-step process, and this method should be invoked on each heartbeat if + * the coordinator provides a {@link Assignment target assignment}. * * @param assignment Target {@link Assignment} * @return {@link ReconciliationResult} */ - ReconciliationResult revoke(Optional<Assignment> assignment) { + ReconciliationResult maybeReconcile(Optional<Assignment> assignment) { // Check for any outstanding operations first. If a conclusive result has already been reached, return that // before processing any further. - Optional<ReconciliationResult> inflightStatus = checkInflightStatus(); + if (inflightCallback.isPresent()) { + // We don't actually need the _result_ of the event, just to know that it's complete. + if (inflightCallback.get().future().isDone()) { + // This is the happy path--we completed the callback. Clear out our inflight callback first, though. + inflightCallback = Optional.empty(); Review Comment: This is unrelated and will be resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org