This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eec9eccacb5 KAFKA-17483: Complete pending share fetch requests on 
broker close (#17096)
eec9eccacb5 is described below

commit eec9eccacb5362eb682e3d6007755b2eb524dda9
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Sep 6 09:33:51 2024 +0100

    KAFKA-17483: Complete pending share fetch requests on broker close (#17096)
    
    The PR adds capability to complete pending fetch requests on broker 
shutdown.
    
    Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy 
<[email protected]>
---
 .../kafka/server/share/SharePartitionManager.java  | 13 +++++++++-
 .../server/share/SharePartitionManagerTest.java    | 28 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 2b39e969ce6..2f6ee6b999d 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -504,6 +504,12 @@ public class SharePartitionManager implements 
AutoCloseable {
     public void close() throws Exception {
         this.timer.close();
         this.persister.stop();
+        if (!fetchQueue.isEmpty()) {
+            log.warn("Closing SharePartitionManager with pending fetch 
requests count: {}", fetchQueue.size());
+            fetchQueue.forEach(shareFetchPartitionData -> 
shareFetchPartitionData.future.completeExceptionally(
+                Errors.BROKER_NOT_AVAILABLE.exception()));
+            fetchQueue.clear();
+        }
     }
 
     private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
@@ -519,7 +525,7 @@ public class SharePartitionManager implements AutoCloseable 
{
      */
     // Visible for testing.
     void maybeProcessFetchQueue() {
-        if (!processFetchQueueLock.compareAndSet(false, true)) {
+        if (!acquireProcessFetchQueueLock()) {
             // The queue is already being processed hence avoid re-triggering.
             return;
         }
@@ -690,6 +696,11 @@ public class SharePartitionManager implements 
AutoCloseable {
         releaseProcessFetchQueueLock();
     }
 
+    // Visible for testing.
+    boolean acquireProcessFetchQueueLock() {
+        return processFetchQueueLock.compareAndSet(false, true);
+    }
+
     private void releaseProcessFetchQueueLock() {
         processFetchQueueLock.set(false);
     }
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 894ab947f0b..3d88a8aa5b0 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
@@ -95,6 +96,7 @@ import java.util.function.Consumer;
 
 import scala.Tuple2;
 
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -1357,6 +1359,32 @@ public class SharePartitionManagerTest {
         Mockito.verify(persister, times(1)).stop();
     }
 
+    @Test
+    public void testCloseShouldCompletePendingFetchRequests() throws Exception 
{
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+        FetchParams fetchParams = new 
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
+            1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty());
+        Uuid fooId = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder().build();
+
+        // Acquire the fetch lock so fetch requests keep waiting in the queue.
+        assertTrue(sharePartitionManager.acquireProcessFetchQueueLock());
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
fetchParams, partitionMaxBytes);
+        // Verify that the fetch request is not completed.
+        assertFalse(future.isDone());
+
+        // Closing the sharePartitionManager closes pending fetch requests in 
the fetch queue.
+        sharePartitionManager.close();
+        // Verify that the fetch request is now completed.
+        assertTrue(future.isDone());
+        assertFutureThrows(future, BrokerNotAvailableException.class);
+    }
+
     @Test
     public void testReleaseSessionSuccess() {
         String groupId = "grp";

Reply via email to