This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3e3f0c43152 KAFKA-19946: Simplify skipping of empty ShareFetch
requests (#21033)
3e3f0c43152 is described below
commit 3e3f0c4315204bf16b6d04b7b91eed6855dad56b
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Dec 3 10:05:43 2025 +0000
KAFKA-19946: Simplify skipping of empty ShareFetch requests (#21033)
ShareFetch requests can be used to fetch records for share consumers,
but in some cases, no records are required and the request is just being
used to update the share session or send acknowledgements. As a result,
there are situations in which a ShareFetch request would be built, only
for it to be entirely empty (no fetch, no asks, no share session
update). This PR simplifies the logic for detecting when the request is
empty and then to avoid building it entirely. It also adds some tests
for this case.
Reviewers: Apoorv Mittal <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 22 ++---
.../consumer/internals/ShareSessionHandler.java | 33 +++++--
.../internals/ShareSessionHandlerTest.java | 103 +++++++++++++++------
.../java/kafka/server/share/SharePartition.java | 2 +-
4 files changed, 110 insertions(+), 50 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 183f15833aa..e57265716f5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -253,24 +253,20 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Node target = entry.getKey();
ShareSessionHandler handler = entry.getValue();
- log.trace("Building ShareFetch request to send to node {}",
target.id());
- ShareFetchRequest.Builder requestBuilder =
handler.newShareFetchBuilder(groupId, shareFetchConfig);
-
// For record_limit mode, we only send a full ShareFetch to a
single node at a time.
// We prepare to build ShareFetch requests for all nodes with
session handlers to permit
// piggy-backing of acknowledgements, and also to adjust the
topic-partitions
- // in the share session.
- if (isShareAcquireModeRecordLimit() && target.id() !=
fetchRecordsNodeId.get()) {
- ShareFetchRequestData data = requestBuilder.data();
- // If there's nothing to send, just skip building the record.
- if (data.topics().isEmpty() &&
data.forgottenTopicsData().isEmpty()) {
- return null;
- } else {
- // There is something to send, but we don't want to fetch
any records.
- requestBuilder.data().setMaxRecords(0);
- }
+ // in the share session, but if the request would contain neither
of those, it can be skipped.
+ boolean canSkipIfRequestEmpty = isShareAcquireModeRecordLimit() &&
target.id() != fetchRecordsNodeId.get();
+
+ ShareFetchRequest.Builder requestBuilder =
handler.newShareFetchBuilder(groupId, shareFetchConfig, canSkipIfRequestEmpty);
+ if (requestBuilder == null) {
+ log.trace("Skipping ShareFetch request to send to node {}",
target.id());
+ return null;
}
+ log.trace("Building ShareFetch request to send to node {}",
target.id());
+
nodesWithPendingRequests.add(target.id());
BiConsumer<ClientResponse, Throwable> responseHandler =
(clientResponse, error) -> {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 348855a341b..0b6cdf0a6db 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -54,6 +54,7 @@ import java.util.stream.Collectors;
* <p>ShareSessionHandler tracks the partitions which are in the session. It
also determines
* which partitions need to be included in each ShareFetch/ShareAcknowledge
request.
*/
+@SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
public class ShareSessionHandler {
private final Logger log;
private final int node;
@@ -112,7 +113,7 @@ public class ShareSessionHandler {
return nextMetadata.isNewSession();
}
- public ShareFetchRequest.Builder newShareFetchBuilder(String groupId,
ShareFetchConfig shareFetchConfig) {
+ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId,
ShareFetchConfig shareFetchConfig, boolean canSkipIfRequestEmpty) {
List<TopicIdPartition> added = new ArrayList<>();
List<TopicIdPartition> removed = new ArrayList<>();
List<TopicIdPartition> replaced = new ArrayList<>();
@@ -158,15 +159,6 @@ public class ShareSessionHandler {
}
}
- if (log.isDebugEnabled()) {
- log.debug("Build ShareFetch {} for node {}. Added {}, removed {},
replaced {} out of {}",
- nextMetadata, node,
- topicIdPartitionsToLogString(added),
- topicIdPartitionsToLogString(removed),
- topicIdPartitionsToLogString(replaced),
- topicIdPartitionsToLogString(sessionPartitions.values()));
- }
-
// The replaced topic-partitions need to be removed, and their
replacements are already added
removed.addAll(replaced);
@@ -187,6 +179,19 @@ public class ShareSessionHandler {
nextPartitions = new LinkedHashMap<>();
nextAcknowledgements = new LinkedHashMap<>();
+ if (canSkipIfRequestEmpty && added.isEmpty() && removed.isEmpty() &&
acknowledgementBatches.isEmpty()) {
+ return null;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Build ShareFetch {} for node {}. Added {}, removed {},
replaced {} out of {}",
+ nextMetadata, node,
+ topicIdPartitionsToLogString(added),
+ topicIdPartitionsToLogString(removed),
+ topicIdPartitionsToLogString(replaced),
+ topicIdPartitionsToLogString(sessionPartitions.values()));
+ }
+
if (hasRenewAcknowledgements) {
// If the request has renew acknowledgements, the ShareFetch is
only used to send the acknowledgements
// and potentially update the share session. The parameters for
wait time, number of bytes and number of
@@ -196,6 +201,14 @@ public class ShareSessionHandler {
0, 0, 0,
0, shareFetchConfig.shareAcquireMode.id, true,
added, removed, acknowledgementBatches);
+ } else if (canSkipIfRequestEmpty) {
+ // The request contains changes to the share session or
acknowledgements only. The parameters for wait time,
+ // number of bytes and number of records are all zero.
+ return ShareFetchRequest.Builder.forConsumer(
+ groupId, nextMetadata, 0,
+ 0, 0, 0,
+ 0, shareFetchConfig.shareAcquireMode.id, false,
+ added, removed, acknowledgementBatches);
} else {
return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, shareFetchConfig.maxWaitMs,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index 72d5d7c4e21..826978aab5a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -26,9 +26,11 @@ import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -45,6 +47,7 @@ import java.util.stream.Stream;
import static
org.apache.kafka.common.requests.ShareRequestMetadata.INITIAL_EPOCH;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -180,7 +183,7 @@ public class ShareSessionHandlerTest {
TopicIdPartition foo1 = new TopicIdPartition(fooId, 1, "foo");
handler.addPartitionToFetch(foo0, null);
handler.addPartitionToFetch(foo1, null);
- ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId,
DEFAULT_SHARE_FETCH_CONFIG).build().data();
+ ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, DEFAULT_SHARE_FETCH_CONFIG,
false).build().data();
ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
expectedToSend1.add(new TopicIdPartition(fooId, 0, "foo"));
expectedToSend1.add(new TopicIdPartition(fooId, 1, "foo"));
@@ -192,7 +195,7 @@ public class ShareSessionHandlerTest {
buildResponseData(new RespEntry("foo", 0, fooId), new
RespEntry("foo", 1, fooId)),
List.of(),
0);
- handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
// Test a fetch request which adds one partition
Uuid barId = addTopicId(topicNames, "bar");
@@ -200,7 +203,7 @@ public class ShareSessionHandlerTest {
handler.addPartitionToFetch(foo0, null);
handler.addPartitionToFetch(foo1, null);
handler.addPartitionToFetch(bar0, null);
- ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId,
DEFAULT_SHARE_FETCH_CONFIG).build().data();
+ ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, DEFAULT_SHARE_FETCH_CONFIG,
false).build().data();
assertMapsEqual(reqMap(new TopicIdPartition(fooId, 0, "foo"),
new TopicIdPartition(fooId, 1, "foo"),
new TopicIdPartition(barId, 0, "bar")),
@@ -214,13 +217,13 @@ public class ShareSessionHandlerTest {
buildResponseData(new RespEntry("foo", 1, fooId)),
List.of(),
0);
- handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion());
// A top-level error code will reset the session epoch
ShareFetchResponse resp3 = ShareFetchResponse.of(error, 0, new
LinkedHashMap<>(), List.of(), 0);
- handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion());
- ShareFetchRequestData requestData4 =
handler.newShareFetchBuilder(groupId,
DEFAULT_SHARE_FETCH_CONFIG).build().data();
+ ShareFetchRequestData requestData4 =
handler.newShareFetchBuilder(groupId, DEFAULT_SHARE_FETCH_CONFIG,
false).build().data();
assertEquals(requestData2.memberId(), requestData4.memberId());
assertEquals(INITIAL_EPOCH, requestData4.shareSessionEpoch());
assertMapsEqual(reqMap(new TopicIdPartition(fooId, 0, "foo"),
@@ -250,7 +253,7 @@ public class ShareSessionHandlerTest {
handler.addPartitionToFetch(foo0, null);
handler.addPartitionToFetch(foo1, null);
handler.addPartitionToFetch(bar0, null);
- ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertMapsEqual(reqMap(
new TopicIdPartition(fooId, 0, "foo"),
new TopicIdPartition(fooId, 1, "foo"),
@@ -271,11 +274,11 @@ public class ShareSessionHandlerTest {
new RespEntry("bar", 0, barId)),
List.of(),
0);
- handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
// Test a fetch request which removes two partitions
handler.addPartitionToFetch(foo1, null);
- ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertEquals(memberId.toString(), requestData2.memberId());
assertEquals(1, requestData2.shareSessionEpoch());
assertMapsEqual(reqMap(new TopicIdPartition(fooId, 1, "foo")),
@@ -288,10 +291,10 @@ public class ShareSessionHandlerTest {
// A top-level error code will reset the session epoch
ShareFetchResponse resp2 =
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new
LinkedHashMap<>(), List.of(), 0);
- handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion());
handler.addPartitionToFetch(foo1, null);
- ShareFetchRequestData requestData3 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData3 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertEquals(memberId.toString(), requestData3.memberId());
assertEquals(INITIAL_EPOCH, requestData3.shareSessionEpoch());
assertMapsEqual(reqMap(new TopicIdPartition(fooId, 1, "foo")),
@@ -312,7 +315,7 @@ public class ShareSessionHandlerTest {
Uuid topicId1 = addTopicId(topicNames, "foo");
TopicIdPartition tp = new TopicIdPartition(topicId1, 0, "foo");
handler.addPartitionToFetch(tp, null);
- ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertMapsEqual(reqMap(new TopicIdPartition(topicId1, 0, "foo")),
handler.sessionPartitionMap());
ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
@@ -324,14 +327,14 @@ public class ShareSessionHandlerTest {
buildResponseData(new RespEntry("foo", 0, topicId1)),
List.of(),
0);
- handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
// Try to add a new topic ID
Uuid topicId2 = addTopicId(topicNames, "foo");
TopicIdPartition tp2 = new TopicIdPartition(topicId2, 0, "foo");
// Use the same data besides the topic ID
handler.addPartitionToFetch(tp2, null);
- ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
// If we started with an ID, only a new ID will count towards replaced.
// The old topic ID partition should be forgotten, and the new one
should be fetched.
@@ -357,7 +360,7 @@ public class ShareSessionHandlerTest {
Uuid topicId = addTopicId(topicNames, "foo");
TopicIdPartition foo0 = new TopicIdPartition(topicId, 0, "foo");
handler.addPartitionToFetch(foo0, null);
- ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertMapsEqual(reqMap(foo0), handler.sessionPartitionMap());
ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
@@ -368,10 +371,10 @@ public class ShareSessionHandlerTest {
buildResponseData(new RespEntry("foo", 0, topicId)),
List.of(),
0);
- handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
// Remove the topic from the session by setting acknowledgements only
- this is not asking to fetch records
- ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
handler.addPartitionToAcknowledgeOnly(foo0, Acknowledgements.empty());
assertEquals(Collections.singletonList(foo0),
reqForgetList(requestData2, topicNames));
@@ -392,7 +395,7 @@ public class ShareSessionHandlerTest {
Uuid topicId = addTopicId(topicNames, "foo");
TopicIdPartition foo0 = new TopicIdPartition(topicId, 0, "foo");
handler.addPartitionToFetch(foo0, null);
- ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertMapsEqual(reqMap(foo0), handler.sessionPartitionMap());
ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
expectedToSend1.add(new TopicIdPartition(topicId, 0, "foo"));
@@ -403,10 +406,10 @@ public class ShareSessionHandlerTest {
buildResponseData(new RespEntry("foo", 0, topicId)),
List.of(),
0);
- handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
// Remove the topic from the session
- ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertEquals(Collections.singletonList(foo0),
reqForgetList(requestData2, topicNames));
// Should have the same session ID, next epoch, and same ID usage
@@ -424,7 +427,7 @@ public class ShareSessionHandlerTest {
Map<Uuid, String> topicNames = new HashMap<>();
Uuid topicId = addTopicId(topicNames, "foo");
handler.addPartitionToFetch(new TopicIdPartition(topicId, 0, "foo"),
null);
- ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData1 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertMapsEqual(reqMap(new TopicIdPartition(topicId, 0, "foo")),
handler.sessionPartitionMap());
ArrayList<TopicIdPartition> expectedToSend1 = new ArrayList<>();
@@ -436,19 +439,19 @@ public class ShareSessionHandlerTest {
buildResponseData(new RespEntry("foo", 0, topicId)),
List.of(),
0);
- handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
// Remove the partition from the session
- ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
assertTrue(handler.sessionPartitionMap().isEmpty());
assertTrue(requestData2.topics().isEmpty());
ShareFetchResponse resp2 = ShareFetchResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(), List.of(), 0);
- handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
+ handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion());
// After the topic is removed, add a recreated topic with a new ID
Uuid topicId2 = addTopicId(topicNames, "foo");
handler.addPartitionToFetch(new TopicIdPartition(topicId2, 0, "foo"),
null);
- ShareFetchRequestData requestData3 =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData3 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
// Should have the same session ID and epoch 2.
assertEquals(memberId.toString(), requestData3.memberId(), "Did not
use same session");
@@ -477,7 +480,7 @@ public class ShareSessionHandlerTest {
// Attempt a new ShareFetch
TopicIdPartition foo1 = new TopicIdPartition(fooId, 1, "foo");
handler.addPartitionToFetch(foo1, null);
- ShareFetchRequestData requestData =
handler.newShareFetchBuilder(groupId, shareFetchConfig).build().data();
+ ShareFetchRequestData requestData =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
// We should have cleared the unsent acknowledgements before this
ShareFetch.
assertEquals(0,
requestData.topics().stream().findFirst().get().partitions().stream().findFirst().get().acknowledgementBatches().size());
@@ -488,6 +491,54 @@ public class ShareSessionHandlerTest {
assertEquals(memberId.toString(), requestData.memberId());
}
+ @Test
+ public void testCanSkipIfRequestEmpty() {
+ ShareFetchConfig shareFetchConfig = SHARE_FETCH_CONFIG_RECORD_LIMIT;
+
+ String groupId = "G1";
+ Uuid memberId = Uuid.randomUuid();
+ ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1,
memberId);
+
+ Map<Uuid, String> topicNames = new HashMap<>();
+ Uuid fooId = addTopicId(topicNames, "foo");
+ TopicIdPartition foo0 = new TopicIdPartition(fooId, 0, "foo");
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(0L, AcknowledgeType.ACCEPT);
+
+ // The request cannot be skipped when a topic-partition is added to
the share session.
+ handler.addPartitionToFetch(foo0, null);
+ ShareFetchRequest.Builder builder =
handler.newShareFetchBuilder(groupId, shareFetchConfig, true);
+ assertNotNull(builder);
+
+ ShareFetchResponse resp = ShareFetchResponse.of(Errors.NONE,
+ 0,
+ buildResponseData(new RespEntry("foo", 0, fooId)),
+ List.of(),
+ 0);
+ handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
+
+ // The request can be skipped when the same topic-partition is already
in the share session.
+ handler.addPartitionToFetch(foo0, null);
+ builder = handler.newShareFetchBuilder(groupId, shareFetchConfig,
true);
+ assertNull(builder);
+
+ // The request cannot be skipped when there are acknowledgements.
+ handler.addPartitionToFetch(foo0, acknowledgements);
+ builder = handler.newShareFetchBuilder(groupId, shareFetchConfig,
true);
+ assertNotNull(builder);
+ handler.handleResponse(resp, ApiKeys.SHARE_FETCH.latestVersion());
+
+ // The request cannot be skipped when the topic-partition is removed
from the share session.
+ builder = handler.newShareFetchBuilder(groupId, shareFetchConfig,
true);
+ assertNotNull(builder);
+ handler.handleResponse(ShareFetchResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(), List.of(), 0), ApiKeys.SHARE_FETCH.latestVersion());
+
+ // The request can be skipped when the share session is empty.
+ builder = handler.newShareFetchBuilder(groupId, shareFetchConfig,
true);
+ assertNull(builder);
+ }
+
private Uuid addTopicId(Map<Uuid, String> topicNames, String name) {
Uuid id = Uuid.randomUuid();
topicNames.put(id, name);
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index f70806468e9..1043adb71fc 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -782,7 +782,7 @@ public class SharePartition {
// check for the floor entry and adjust the base offset
accordingly.
if (baseOffset < startOffset) {
log.info("Adjusting base offset for the fetch as it's
prior to start offset: {}-{}"
- + "from {} to {}", groupId, topicIdPartition,
baseOffset, startOffset);
+ + " from {} to {}", groupId, topicIdPartition,
baseOffset, startOffset);
baseOffset = startOffset;
}
} else if (floorEntry.getValue().lastOffset() >= baseOffset) {