kirktrue commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1362615379


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignmentReconciler.java:
##########
@@ -85,122 +111,162 @@ enum ReconciliationResult {
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
     private Optional<RebalanceCallbackEvent> inflightCallback;
 
-    public MemberAssignmentReconciler(LogContext logContext,
-                                      SubscriptionState subscriptions,
-                                      ConsumerMetadata metadata,
-                                      BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+    AssignmentReconciler(LogContext logContext,
+                         SubscriptionState subscriptions,
+                         ConsumerMetadata metadata,
+                         BlockingQueue<BackgroundEvent> backgroundEventQueue) {
         this.log = logContext.logger(getClass());
         this.subscriptions = subscriptions;
         this.metadata = metadata;
         this.backgroundEventQueue = backgroundEventQueue;
     }
 
     /**
-     * Perform the revocation process, if necessary, depending on the given 
{@link Assignment target assignment}. If the
-     * {@link SubscriptionState#assignedPartitions() current set of assigned 
partitions} includes entries that are
-     * <em>not</em> in the target assignment, these will be considered for 
revocation. If there is already a
-     * reconciliation in progress (revocation or assignment), this method will 
return without performing any
-     * revocation.
+     * Perform the reconciliation process, as necessary to meet the given 
{@link Assignment target assignment}. Note
+     * that the reconciliation is a multi-step process, and this method should 
be invoked on each heartbeat if
+     * the coordinator provides a {@link Assignment target assignment}.
      *
      * @param assignment Target {@link Assignment}
      * @return {@link ReconciliationResult}
      */
-    ReconciliationResult revoke(Optional<Assignment> assignment) {
+    ReconciliationResult maybeReconcile(Optional<Assignment> assignment) {
         // Check for any outstanding operations first. If a conclusive result 
has already been reached, return that
         // before processing any further.
-        Optional<ReconciliationResult> inflightStatus = checkInflightStatus();
+        if (inflightCallback.isPresent()) {
+            // We don't actually need the _result_ of the event, just to know 
that it's complete.
+            if (inflightCallback.get().future().isDone()) {
+                // This is the happy path--we completed the callback. Clear 
out our inflight callback first, though.
+                inflightCallback = Optional.empty();

Review Comment:
   This is unrelated and will be resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to