This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fedbb90c128 KAFKA-19232: Handle Share session limit reached exception in clients. (#19619) fedbb90c128 is described below commit fedbb90c12887a693dfa333e8c7c7cd970f4fd33 Author: Shivsundar R <s...@confluent.io> AuthorDate: Sun May 4 14:59:40 2025 -0400 KAFKA-19232: Handle Share session limit reached exception in clients. (#19619) Handle the new `ShareSessionLimitReachedException` in `ShareSessionHandler` in the client to reset the ShareSession. Added a unit test verifying the change. Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../kafka/clients/consumer/internals/ShareSessionHandler.java | 3 ++- .../clients/consumer/internals/ShareSessionHandlerTest.java | 9 ++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java index 34a109944be..634a9839c5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java @@ -219,7 +219,8 @@ public class ShareSessionHandler { */ public boolean handleResponse(ShareFetchResponse response, short version) { if ((response.error() == Errors.SHARE_SESSION_NOT_FOUND) || - (response.error() == Errors.INVALID_SHARE_SESSION_EPOCH)) { + (response.error() == Errors.INVALID_SHARE_SESSION_EPOCH) || + (response.error() == Errors.SHARE_SESSION_LIMIT_REACHED)) { log.info("Node {} was unable to process the ShareFetch request with {}: {}.", node, nextMetadata, response.error()); nextMetadata = nextMetadata.nextCloseExistingAttemptNew(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java index 5a52b7bc35f..aa5ebb80380 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java @@ -31,6 +31,8 @@ import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.ArrayList; import java.util.Collections; @@ -149,8 +151,9 @@ public class ShareSessionHandlerTest { return topicIdPartitionToPartition; } - @Test - public void testShareSession() { + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"INVALID_SHARE_SESSION_EPOCH", "SHARE_SESSION_NOT_FOUND", "SHARE_SESSION_LIMIT_REACHED"}) + public void testShareSession(Errors error) { String groupId = "G1"; Uuid memberId = Uuid.randomUuid(); ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId); @@ -199,7 +202,7 @@ public class ShareSessionHandlerTest { handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true)); // A top-level error code will reset the session epoch - ShareFetchResponse resp3 = ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new LinkedHashMap<>(), List.of(), 0); + ShareFetchResponse resp3 = ShareFetchResponse.of(error, 0, new LinkedHashMap<>(), List.of(), 0); handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true)); ShareFetchRequestData requestData4 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();