This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push: new 802431901ab MINOR: Correcting exception codes in Share Partition (#20028) 802431901ab is described below commit 802431901ab0356fe65a077130bfe7d6ef249c16 Author: Apoorv Mittal <apoorvmitta...@gmail.com> AuthorDate: Tue Jun 24 12:46:27 2025 +0100 MINOR: Correcting exception codes in Share Partition (#20028) The PR cherrypicks below commit: ``` commit 1ca8779bee9cdbd53b0cb4536270461a6ed0b6d2 Author: Apoorv Mittal <apoorvmitta...@gmail.com> Date: Tue Jun 24 09:46:14 2025 +0100 MINOR: Correcting client error for fenced share partition (#20023) Correct the error when SharePartition is fenced. Reviewers: Abhinav Dixit <adi...@confluent.io>, Sushant Mahajan <smaha...@confluent.io>, Andrew Schofield <aschofi...@confluent.io> ``` And selective changes from following commit, which are critical: ``` commit 3d4407ff9daeb3586b049bfd4a70bd374130616d Author: Sushant Mahajan <smaha...@confluent.io> Date: Mon Jun 23 23:57:15 2025 +0530 MINOR: Change exceptions for few error codes in SharePartition. (#20020) * The `SharePartition` class wraps the errors received from `PersisterStateManager` to be sent to the client. * In this PR, we are categorizing the errors a bit better. * Some exception messages in `PersisterStateManager` have been updated to show the share partition key. * Tests have been updated wherever needed. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Apoorv Mittal <apoorvmitta...@gmail.com> ``` Reviewers: Andrew Schofield <aschofi...@confluent.io>, Sushant Mahajan <smaha...@confluent.io> --- .../java/kafka/server/share/SharePartition.java | 7 ++--- .../kafka/server/share/SharePartitionTest.java | 33 ++++++++++++++++++++-- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 91a11d488f4..09385d6c48c 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; -import org.apache.kafka.common.errors.FencedStateEpochException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -1490,7 +1489,7 @@ public class SharePartition { String.format("Share partition failed to load %s-%s", groupId, topicIdPartition)); case INITIALIZING -> throw new LeaderNotAvailableException( String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition)); - case FENCED -> throw new FencedStateEpochException( + case FENCED -> throw new LeaderNotAvailableException( String.format("Share partition is fenced %s-%s", groupId, topicIdPartition)); case EMPTY -> // The share partition is not yet initialized. @@ -2343,9 +2342,7 @@ public class SharePartition { new GroupIdNotFoundException(errorMessage); case UNKNOWN_TOPIC_OR_PARTITION -> new UnknownTopicOrPartitionException(errorMessage); - case FENCED_STATE_EPOCH -> - new FencedStateEpochException(errorMessage); - case FENCED_LEADER_EPOCH -> + case FENCED_LEADER_EPOCH, FENCED_STATE_EPOCH -> new NotLeaderOrFollowerException(errorMessage); default -> new UnknownServerException(errorMessage); diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index ac8e9ba0fd6..0cc7ad4d9a1 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -28,10 +28,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; -import org.apache.kafka.common.errors.FencedStateEpochException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -758,7 +758,7 @@ public class SharePartitionTest { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(FencedStateEpochException.class, result); + assertFutureThrows(NotLeaderOrFollowerException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock FENCED_LEADER_EPOCH error. @@ -788,6 +788,20 @@ public class SharePartitionTest { assertTrue(result.isCompletedExceptionally()); assertFutureThrows(UnknownServerException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); + + // Mock NETWORK_EXCEPTION error. + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 5, 10L, Errors.NETWORK_EXCEPTION.code(), Errors.NETWORK_EXCEPTION.message(), + List.of()))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(UnknownServerException.class, result); + assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @Test @@ -935,6 +949,19 @@ public class SharePartitionTest { assertThrows(RuntimeException.class, sharePartition2::maybeInitialize); } + @Test + public void testMaybeInitializeFencedSharePartition() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + // Mark the share partition as fenced. + sharePartition.markFenced(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(LeaderNotAvailableException.class, result); + assertEquals(SharePartitionState.FENCED, sharePartition.partitionState()); + } + @Test public void testMaybeInitializeStateBatchesWithGapAtBeginning() { Persister persister = Mockito.mock(Persister.class); @@ -5564,7 +5591,7 @@ public class SharePartitionTest { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(FencedStateEpochException.class, result); + assertFutureThrows(NotLeaderOrFollowerException.class, result); // Mock Write state RPC to return error response, FENCED_LEADER_EPOCH. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(