This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eec9eccacb5 KAFKA-17483: Complete pending share fetch requests on
broker close (#17096)
eec9eccacb5 is described below
commit eec9eccacb5362eb682e3d6007755b2eb524dda9
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Sep 6 09:33:51 2024 +0100
KAFKA-17483: Complete pending share fetch requests on broker close (#17096)
The PR adds capability to complete pending fetch requests on broker
shutdown.
Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../kafka/server/share/SharePartitionManager.java | 13 +++++++++-
.../server/share/SharePartitionManagerTest.java | 28 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 2b39e969ce6..2f6ee6b999d 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -504,6 +504,12 @@ public class SharePartitionManager implements
AutoCloseable {
public void close() throws Exception {
this.timer.close();
this.persister.stop();
+ if (!fetchQueue.isEmpty()) {
+ log.warn("Closing SharePartitionManager with pending fetch
requests count: {}", fetchQueue.size());
+ fetchQueue.forEach(shareFetchPartitionData ->
shareFetchPartitionData.future.completeExceptionally(
+ Errors.BROKER_NOT_AVAILABLE.exception()));
+ fetchQueue.clear();
+ }
}
private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
@@ -519,7 +525,7 @@ public class SharePartitionManager implements AutoCloseable
{
*/
// Visible for testing.
void maybeProcessFetchQueue() {
- if (!processFetchQueueLock.compareAndSet(false, true)) {
+ if (!acquireProcessFetchQueueLock()) {
// The queue is already being processed hence avoid re-triggering.
return;
}
@@ -690,6 +696,11 @@ public class SharePartitionManager implements
AutoCloseable {
releaseProcessFetchQueueLock();
}
+ // Visible for testing.
+ boolean acquireProcessFetchQueueLock() {
+ return processFetchQueueLock.compareAndSet(false, true);
+ }
+
private void releaseProcessFetchQueueLock() {
processFetchQueueLock.set(false);
}
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 894ab947f0b..3d88a8aa5b0 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
@@ -95,6 +96,7 @@ import java.util.function.Consumer;
import scala.Tuple2;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -1357,6 +1359,32 @@ public class SharePartitionManagerTest {
Mockito.verify(persister, times(1)).stop();
}
+ @Test
+ public void testCloseShouldCompletePendingFetchRequests() throws Exception
{
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+ FetchParams fetchParams = new
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
+ 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty());
+ Uuid fooId = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(fooId, new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder().build();
+
+ // Acquire the fetch lock so fetch requests keep waiting in the queue.
+ assertTrue(sharePartitionManager.acquireProcessFetchQueueLock());
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
fetchParams, partitionMaxBytes);
+ // Verify that the fetch request is not completed.
+ assertFalse(future.isDone());
+
+ // Closing the sharePartitionManager closes pending fetch requests in
the fetch queue.
+ sharePartitionManager.close();
+ // Verify that the fetch request is now completed.
+ assertTrue(future.isDone());
+ assertFutureThrows(future, BrokerNotAvailableException.class);
+ }
+
@Test
public void testReleaseSessionSuccess() {
String groupId = "grp";