This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 802431901ab MINOR: Correcting exception codes in Share Partition 
(#20028)
802431901ab is described below

commit 802431901ab0356fe65a077130bfe7d6ef249c16
Author: Apoorv Mittal <apoorvmitta...@gmail.com>
AuthorDate: Tue Jun 24 12:46:27 2025 +0100

    MINOR: Correcting exception codes in Share Partition (#20028)
    
    The PR cherrypicks below commit:
    
    ```
    commit 1ca8779bee9cdbd53b0cb4536270461a6ed0b6d2
    Author: Apoorv Mittal <apoorvmitta...@gmail.com>
    Date:   Tue Jun 24 09:46:14 2025 +0100
    
        MINOR: Correcting client error for fenced share partition (#20023)
    
        Correct the error when SharePartition is fenced.
    
        Reviewers: Abhinav Dixit <adi...@confluent.io>, Sushant Mahajan
         <smaha...@confluent.io>, Andrew Schofield <aschofi...@confluent.io>
    ```
    
    And selective changes from following commit, which are critical:
    
    ```
    commit 3d4407ff9daeb3586b049bfd4a70bd374130616d
    Author: Sushant Mahajan <smaha...@confluent.io>
    Date:   Mon Jun 23 23:57:15 2025 +0530
    
        MINOR: Change exceptions for few error codes in SharePartition.
    (#20020)
    
        * The `SharePartition` class wraps the errors received from
    `PersisterStateManager` to be sent to the client.      * In this PR, we
    are categorizing the errors a bit better.      * Some exception messages
    in `PersisterStateManager` have been updated      to show the share
    partition key.      * Tests have been updated wherever needed.
    
        Reviewers: Andrew Schofield <aschofi...@confluent.io>, Apoorv Mittal
    <apoorvmitta...@gmail.com>
    ```
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>, Sushant Mahajan
     <smaha...@confluent.io>
---
 .../java/kafka/server/share/SharePartition.java    |  7 ++---
 .../kafka/server/share/SharePartitionTest.java     | 33 ++++++++++++++++++++--
 2 files changed, 32 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 91a11d488f4..09385d6c48c 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.FencedStateEpochException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -1490,7 +1489,7 @@ public class SharePartition {
                 String.format("Share partition failed to load %s-%s", groupId, 
topicIdPartition));
             case INITIALIZING -> throw new LeaderNotAvailableException(
                 String.format("Share partition is already initializing %s-%s", 
groupId, topicIdPartition));
-            case FENCED -> throw new FencedStateEpochException(
+            case FENCED -> throw new LeaderNotAvailableException(
                 String.format("Share partition is fenced %s-%s", groupId, 
topicIdPartition));
             case EMPTY ->
                 // The share partition is not yet initialized.
@@ -2343,9 +2342,7 @@ public class SharePartition {
                 new GroupIdNotFoundException(errorMessage);
             case UNKNOWN_TOPIC_OR_PARTITION ->
                 new UnknownTopicOrPartitionException(errorMessage);
-            case FENCED_STATE_EPOCH ->
-                new FencedStateEpochException(errorMessage);
-            case FENCED_LEADER_EPOCH ->
+            case FENCED_LEADER_EPOCH, FENCED_STATE_EPOCH ->
                 new NotLeaderOrFollowerException(errorMessage);
             default ->
                 new UnknownServerException(errorMessage);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index ac8e9ba0fd6..0cc7ad4d9a1 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -28,10 +28,10 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.FencedStateEpochException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -758,7 +758,7 @@ public class SharePartitionTest {
         result = sharePartition.maybeInitialize();
         assertTrue(result.isDone());
         assertTrue(result.isCompletedExceptionally());
-        assertFutureThrows(FencedStateEpochException.class, result);
+        assertFutureThrows(NotLeaderOrFollowerException.class, result);
         assertEquals(SharePartitionState.FAILED, 
sharePartition.partitionState());
 
         // Mock FENCED_LEADER_EPOCH error.
@@ -788,6 +788,20 @@ public class SharePartitionTest {
         assertTrue(result.isCompletedExceptionally());
         assertFutureThrows(UnknownServerException.class, result);
         assertEquals(SharePartitionState.FAILED, 
sharePartition.partitionState());
+
+        // Mock NETWORK_EXCEPTION error.
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 5, 10L, 
Errors.NETWORK_EXCEPTION.code(), Errors.NETWORK_EXCEPTION.message(),
+                    List.of())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(UnknownServerException.class, result);
+        assertEquals(SharePartitionState.FAILED, 
sharePartition.partitionState());
     }
 
     @Test
@@ -935,6 +949,19 @@ public class SharePartitionTest {
         assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
     }
 
+    @Test
+    public void testMaybeInitializeFencedSharePartition() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+        // Mark the share partition as fenced.
+        sharePartition.markFenced();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(LeaderNotAvailableException.class, result);
+        assertEquals(SharePartitionState.FENCED, 
sharePartition.partitionState());
+    }
+
     @Test
     public void testMaybeInitializeStateBatchesWithGapAtBeginning() {
         Persister persister = Mockito.mock(Persister.class);
@@ -5564,7 +5591,7 @@ public class SharePartitionTest {
 
         result = sharePartition.writeShareGroupState(anyList());
         assertTrue(result.isCompletedExceptionally());
-        assertFutureThrows(FencedStateEpochException.class, result);
+        assertFutureThrows(NotLeaderOrFollowerException.class, result);
 
         // Mock Write state RPC to return error response, FENCED_LEADER_EPOCH.
         
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(

Reply via email to