This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 53c8fab8144 KAFKA-19767: Add partitions to share fetch buffer together
(#20737)
53c8fab8144 is described below
commit 53c8fab8144b49afe587507ecd460b4e9d753fad
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Oct 23 18:36:14 2025 +0100
KAFKA-19767: Add partitions to share fetch buffer together (#20737)
For a ShareFetchResponse containing data from multiple partitions, the
code used to add the records for each partition separately to the share
fetch buffer, signalling completion for each. This small change adds
them all in the same operation.
Reviewers: Apoorv Mittal <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 12 +++++++++---
.../consumer/internals/ShareFetchBuffer.java | 21 +++------------------
.../consumer/internals/ShareFetchBufferTest.java | 9 ++++-----
.../consumer/internals/ShareFetchCollectorTest.java | 14 +++++++-------
4 files changed, 23 insertions(+), 33 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 647b5b7e4b4..e7bfdaefe49 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -785,6 +785,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
final Set<TopicPartition> partitions =
responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
final ShareFetchMetricsAggregator shareFetchMetricsAggregator =
new ShareFetchMetricsAggregator(metricsManager, partitions);
+ List<ShareCompletedFetch> completedFetches = new
ArrayList<>(responseData.size());
Map<TopicPartition, Metadata.LeaderIdAndEpoch>
partitionsWithUpdatedLeaderInfo = new HashMap<>();
for (Map.Entry<TopicIdPartition,
ShareFetchResponseData.PartitionData> entry : responseData.entrySet()) {
TopicIdPartition tip = entry.getKey();
@@ -815,21 +816,26 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
}
- ShareCompletedFetch completedFetch = new ShareCompletedFetch(
+ completedFetches.add(
+ new ShareCompletedFetch(
logContext,
BufferSupplier.create(),
fetchTarget.id(),
tip,
partitionData,
shareFetchMetricsAggregator,
- requestVersion);
- shareFetchBuffer.add(completedFetch);
+ requestVersion)
+ );
if (!partitionData.acquiredRecords().isEmpty()) {
fetchMoreRecords = false;
}
}
+ if (!completedFetches.isEmpty()) {
+ shareFetchBuffer.add(completedFetches);
+ }
+
// Handle any acknowledgements which were not received in the
response for this node.
if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition,
acknowledgements) -> {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchBuffer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchBuffer.java
index ebd86583ec6..7735c804513 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchBuffer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchBuffer.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -32,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Predicate;
/**
* {@code ShareFetchBuffer} buffers up {@link ShareCompletedFetch the results}
from the broker responses
@@ -73,25 +73,10 @@ public class ShareFetchBuffer implements AutoCloseable {
}
}
- /**
- * Return whether we have any completed fetches pending return to the
user. This method is thread-safe. Has
- * visibility for testing.
- *
- * @return {@code true} if there are completed fetches that match the
{@link Predicate}, {@code false} otherwise
- */
- boolean hasCompletedFetches(Predicate<ShareCompletedFetch> predicate) {
- lock.lock();
- try {
- return completedFetches.stream().anyMatch(predicate);
- } finally {
- lock.unlock();
- }
- }
-
- void add(ShareCompletedFetch fetch) {
+ void add(List<ShareCompletedFetch> fetches) {
lock.lock();
try {
- completedFetches.add(fetch);
+ completedFetches.addAll(fetches);
notEmptyCondition.signalAll();
} finally {
lock.unlock();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
index 2a06324f72a..72f9856d731 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -87,8 +88,7 @@ public class ShareFetchBufferTest {
try (ShareFetchBuffer fetchBuffer = new ShareFetchBuffer(logContext)) {
ShareCompletedFetch completedFetch =
completedFetch(topicAPartition0);
assertTrue(fetchBuffer.isEmpty());
- fetchBuffer.add(completedFetch);
- assertTrue(fetchBuffer.hasCompletedFetches(p -> true));
+ fetchBuffer.add(List.of(completedFetch));
assertFalse(fetchBuffer.isEmpty());
assertNotNull(fetchBuffer.peek());
assertSame(completedFetch, fetchBuffer.peek());
@@ -111,7 +111,7 @@ public class ShareFetchBufferTest {
assertNull(fetchBuffer.nextInLineFetch());
assertTrue(fetchBuffer.isEmpty());
- fetchBuffer.add(completedFetch(topicAPartition0));
+ fetchBuffer.add(List.of(completedFetch(topicAPartition0)));
assertFalse(fetchBuffer.isEmpty());
fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
@@ -132,8 +132,7 @@ public class ShareFetchBufferTest {
public void testBufferedPartitions() {
try (ShareFetchBuffer fetchBuffer = new ShareFetchBuffer(logContext)) {
fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
- fetchBuffer.add(completedFetch(topicAPartition1));
- fetchBuffer.add(completedFetch(topicAPartition2));
+ fetchBuffer.add(List.of(completedFetch(topicAPartition1),
completedFetch(topicAPartition2)));
assertEquals(allPartitions, fetchBuffer.bufferedPartitions());
fetchBuffer.setNextInLineFetch(null);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index afac3a76d95..b086feb8cbd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -95,7 +95,7 @@ public class ShareFetchCollectorTest {
// Validate that the buffer is empty until after we add the fetch data.
assertTrue(fetchBuffer.isEmpty());
- fetchBuffer.add(completedFetch);
+ fetchBuffer.add(List.of(completedFetch));
assertFalse(fetchBuffer.isEmpty());
// Validate that the completed fetch isn't initialized just because we
add it to the buffer.
@@ -153,7 +153,7 @@ public class ShareFetchCollectorTest {
ShareCompletedFetch completedFetch = completedFetchBuilder
.recordCount(10)
.build();
- fetchBuffer.add(completedFetch);
+ fetchBuffer.add(List.of(completedFetch));
// At first, the queue is populated
assertFalse(fetchBuffer.isEmpty());
@@ -170,7 +170,7 @@ public class ShareFetchCollectorTest {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(Errors.TOPIC_AUTHORIZATION_FAILED)
.build();
- fetchBuffer.add(completedFetch);
+ fetchBuffer.add(List.of(completedFetch));
assertThrows(TopicAuthorizationException.class, () ->
fetchCollector.collect(fetchBuffer));
}
@@ -182,7 +182,7 @@ public class ShareFetchCollectorTest {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(Errors.UNKNOWN_LEADER_EPOCH)
.build();
- fetchBuffer.add(completedFetch);
+ fetchBuffer.add(List.of(completedFetch));
ShareFetch<String, String> fetch = fetchCollector.collect(fetchBuffer);
assertTrue(fetch.isEmpty());
}
@@ -195,7 +195,7 @@ public class ShareFetchCollectorTest {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(Errors.UNKNOWN_SERVER_ERROR)
.build();
- fetchBuffer.add(completedFetch);
+ fetchBuffer.add(List.of(completedFetch));
ShareFetch<String, String> fetch = fetchCollector.collect(fetchBuffer);
assertTrue(fetch.isEmpty());
}
@@ -208,7 +208,7 @@ public class ShareFetchCollectorTest {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(Errors.CORRUPT_MESSAGE)
.build();
- fetchBuffer.add(completedFetch);
+ fetchBuffer.add(List.of(completedFetch));
assertThrows(KafkaException.class, () ->
fetchCollector.collect(fetchBuffer));
}
@@ -221,7 +221,7 @@ public class ShareFetchCollectorTest {
ShareCompletedFetch completedFetch = completedFetchBuilder
.error(error)
.build();
- fetchBuffer.add(completedFetch);
+ fetchBuffer.add(List.of(completedFetch));
assertThrows(IllegalStateException.class, () ->
fetchCollector.collect(fetchBuffer));
}