This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 90573b4b537 KAFKA-18569: New consumer close may wait on unneeded 
FindCoordinator (#18590)
90573b4b537 is described below

commit 90573b4b53725f8d7ed26afc18f1aef693902d15
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Jan 30 03:15:56 2025 +0800

    KAFKA-18569: New consumer close may wait on unneeded FindCoordinator 
(#18590)
    
    Reviewers: Lianet Magrans <[email protected]>, Kirk True 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 10 ++++++++
 .../internals/CoordinatorRequestManager.java       |  8 +++++-
 .../internals/events/ApplicationEvent.java         |  2 +-
 .../events/ApplicationEventProcessor.java          | 11 ++++++++
 .../events/StopFindCoordinatorOnCloseEvent.java    | 29 ++++++++++++++++++++++
 .../internals/CoordinatorRequestManagerTest.java   | 15 +++++++++++
 .../integration/kafka/api/ConsumerBounceTest.scala |  4 +--
 7 files changed, 74 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index f5e12407be5..2c65b44a7a0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -62,6 +62,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
@@ -1333,6 +1334,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         // sequence...
         swallow(log, Level.ERROR, "Failed to auto-commit offsets",
             () -> autoCommitOnClose(closeTimer), firstException);
+        swallow(log, Level.ERROR, "Failed to stop finding coordinator",
+            this::stopFindCoordinatorOnClose, firstException);
         swallow(log, Level.ERROR, "Failed to release group assignment",
             () -> runRebalanceCallbacksOnClose(closeTimer), firstException);
         swallow(log, Level.ERROR, "Failed to leave group while closing 
consumer",
@@ -1421,6 +1424,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
+    private void stopFindCoordinatorOnClose() {
+        if (groupMetadata.get().isEmpty())
+            return;
+        log.debug("Stop finding coordinator during consumer close");
+        applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
+    }
+
     // Visible for testing
     void commitSyncAllConsumed(final Timer timer) {
         log.debug("Sending synchronous auto-commit on closing");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index 4664267a0e8..0f1650d0e67 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -59,6 +59,7 @@ public class CoordinatorRequestManager implements 
RequestManager {
     private final RequestState coordinatorRequestState;
     private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
     private long totalDisconnectedMin = 0;
+    private boolean closing = false;
     private Node coordinator;
 
     public CoordinatorRequestManager(
@@ -80,6 +81,11 @@ public class CoordinatorRequestManager implements 
RequestManager {
         );
     }
 
+    @Override
+    public void signalClose() {
+        closing = true;
+    }
+
     /**
      * Poll for the FindCoordinator request.
      * If we don't need to discover a coordinator, this method will return a 
PollResult with Long.MAX_VALUE backoff time and an empty list.
@@ -92,7 +98,7 @@ public class CoordinatorRequestManager implements 
RequestManager {
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        if (this.coordinator != null)
+        if (closing || this.coordinator != null)
             return EMPTY;
 
         if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index dfb775f8947..4e0b8e3d2d1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -34,7 +34,7 @@ public abstract class ApplicationEvent {
         TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, 
TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
         UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
         CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
-        COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
+        COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, 
STOP_FIND_COORDINATOR_ON_CLOSE,
         PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
         SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
         SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 9c119e28b7b..69cc0072a39 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -148,6 +148,10 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 process((LeaveGroupOnCloseEvent) event);
                 return;
 
+            case STOP_FIND_COORDINATOR_ON_CLOSE:
+                process((StopFindCoordinatorOnCloseEvent) event);
+                return;
+
             case CREATE_FETCH_REQUESTS:
                 process((CreateFetchRequestsEvent) event);
                 return;
@@ -452,6 +456,13 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         future.whenComplete(complete(event.future()));
     }
 
+    private void process(@SuppressWarnings("unused") final 
StopFindCoordinatorOnCloseEvent event) {
+        requestManagers.coordinatorRequestManager.ifPresent(manager -> {
+            log.debug("Signal CoordinatorRequestManager closing");
+            manager.signalClose();
+        });
+    }
+
     /**
      * Process event that tells the share consume request manager to fetch 
more records.
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StopFindCoordinatorOnCloseEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StopFindCoordinatorOnCloseEvent.java
new file mode 100644
index 00000000000..692e26586af
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StopFindCoordinatorOnCloseEvent.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * This event is raised when the consumer is closing to prevent the 
CoordinatorRequestManager from
+ * generating FindCoordinator requests. This event ensures that no new 
coordinator requests
+ * are initiated once the consumer has completed all coordinator-dependent 
operations and
+ * is in the process of shutting down.
+ */
+public class StopFindCoordinatorOnCloseEvent extends ApplicationEvent {
+    public StopFindCoordinatorOnCloseEvent() {
+        super(Type.STOP_FIND_COORDINATOR_ON_CLOSE);
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index 7e805dc3cd3..2165cb814ed 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -254,6 +254,21 @@ public class CoordinatorRequestManagerTest {
         assertEquals(1, res2.unsentRequests.size());
     }
 
+    @Test
+    public void testSignalOnClose() {
+        CoordinatorRequestManager coordinatorManager = 
setupCoordinatorManager(GROUP_ID);
+        expectFindCoordinatorRequest(coordinatorManager, Errors.NONE);
+        assertTrue(coordinatorManager.coordinator().isPresent());
+        coordinatorManager.markCoordinatorUnknown("coordinator changed", 
time.milliseconds());
+        assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests);
+        coordinatorManager.signalClose();
+        time.sleep(RETRY_BACKOFF_MS - 1);
+        assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests);
+        time.sleep(RETRY_BACKOFF_MS);
+        assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests,
+            "Should not generate find coordinator request during close");
+    }
+
     private void expectFindCoordinatorRequest(
         CoordinatorRequestManager  coordinatorManager,
         Errors error
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 3b257e3f7a5..a91ad5dd563 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -413,9 +413,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
    * close should terminate immediately without sending leave group.
    */
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
-  // TODO: enable for all protocols after fix for not generating/blocking on 
unneeded
-  //  FindCoordinator on close for the new consumer
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
     val topic = "closetest"
     createTopic(topic, 10, brokerCount)

Reply via email to