This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fedbb90c128 KAFKA-19232: Handle Share session limit reached exception 
in clients. (#19619)
fedbb90c128 is described below

commit fedbb90c12887a693dfa333e8c7c7cd970f4fd33
Author: Shivsundar R <s...@confluent.io>
AuthorDate: Sun May 4 14:59:40 2025 -0400

    KAFKA-19232: Handle Share session limit reached exception in clients. 
(#19619)
    
    Handle the new `ShareSessionLimitReachedException` in
    `ShareSessionHandler` in the client to reset the ShareSession.  Added a
    unit test verifying the change.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../kafka/clients/consumer/internals/ShareSessionHandler.java    | 3 ++-
 .../clients/consumer/internals/ShareSessionHandlerTest.java      | 9 ++++++---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 34a109944be..634a9839c5d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -219,7 +219,8 @@ public class ShareSessionHandler {
      */
     public boolean handleResponse(ShareFetchResponse response, short version) {
         if ((response.error() == Errors.SHARE_SESSION_NOT_FOUND) ||
-                (response.error() == Errors.INVALID_SHARE_SESSION_EPOCH)) {
+                (response.error() == Errors.INVALID_SHARE_SESSION_EPOCH) ||
+                (response.error() == Errors.SHARE_SESSION_LIMIT_REACHED)) {
             log.info("Node {} was unable to process the ShareFetch request 
with {}: {}.",
                     node, nextMetadata, response.error());
             nextMetadata = nextMetadata.nextCloseExistingAttemptNew();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index 5a52b7bc35f..aa5ebb80380 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -31,6 +31,8 @@ import org.apache.kafka.common.utils.LogContext;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -149,8 +151,9 @@ public class ShareSessionHandlerTest {
         return topicIdPartitionToPartition;
     }
 
-    @Test
-    public void testShareSession() {
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"INVALID_SHARE_SESSION_EPOCH", 
"SHARE_SESSION_NOT_FOUND", "SHARE_SESSION_LIMIT_REACHED"})
+    public void testShareSession(Errors error) {
         String groupId = "G1";
         Uuid memberId = Uuid.randomUuid();
         ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, 
memberId);
@@ -199,7 +202,7 @@ public class ShareSessionHandlerTest {
         handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         // A top-level error code will reset the session epoch
-        ShareFetchResponse resp3 = 
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new 
LinkedHashMap<>(), List.of(), 0);
+        ShareFetchResponse resp3 = ShareFetchResponse.of(error, 0, new 
LinkedHashMap<>(), List.of(), 0);
         handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true));
 
         ShareFetchRequestData requestData4 = 
handler.newShareFetchBuilder(groupId, fetchConfig).build().data();

Reply via email to