This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa54065298d KAFKA-18086: Enable propagation of the error message when 
writing state (#17980)
fa54065298d is described below

commit fa54065298d1f3f62aa1e3563722fb58ac6f491d
Author: Yung <[email protected]>
AuthorDate: Fri Dec 6 01:48:26 2024 +0800

    KAFKA-18086: Enable propagation of the error message when writing state 
(#17980)
    
    * KAFKA-18086: Enable propagation of the error message when writing state
    
    * Propagate the error message in the writing state when calling 
SharePartitionManager.acknowledge and SharePartitionManager.releaseSession, and 
add corresponding tests to verify that the expected error message is propagated.
    
    * Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal 
<[email protected]>
---
 .../kafka/server/share/SharePartitionManager.java  | 46 ++++++++++++++--------
 .../server/share/SharePartitionManagerTest.java    |  5 +++
 2 files changed, 35 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index ac96fe348b3..a256b28a736 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -272,20 +272,20 @@ public class SharePartitionManager implements 
AutoCloseable {
         log.trace("Acknowledge request for topicIdPartitions: {} with groupId: 
{}",
             acknowledgeTopics.keySet(), groupId);
         this.shareGroupMetrics.shareAcknowledgement();
-        Map<TopicIdPartition, CompletableFuture<Errors>> futures = new 
HashMap<>();
+        Map<TopicIdPartition, CompletableFuture<Throwable>> futures = new 
HashMap<>();
         acknowledgeTopics.forEach((topicIdPartition, 
acknowledgePartitionBatches) -> {
             SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, 
topicIdPartition);
             SharePartition sharePartition = 
partitionCacheMap.get(sharePartitionKey);
             if (sharePartition != null) {
-                CompletableFuture<Errors> future = new CompletableFuture<>();
+                CompletableFuture<Throwable> future = new 
CompletableFuture<>();
                 sharePartition.acknowledge(memberId, 
acknowledgePartitionBatches).whenComplete((result, throwable) -> {
                     if (throwable != null) {
                         handleFencedSharePartitionException(sharePartitionKey, 
throwable);
-                        future.complete(Errors.forException(throwable));
+                        future.complete(throwable);
                         return;
                     }
                     acknowledgePartitionBatches.forEach(batch -> 
batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement));
-                    future.complete(Errors.NONE);
+                    future.complete(null);
                 });
 
                 // If we have an acknowledgement completed for a 
topic-partition, then we should check if
@@ -295,7 +295,7 @@ public class SharePartitionManager implements AutoCloseable 
{
 
                 futures.put(topicIdPartition, future);
             } else {
-                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
             }
         });
 
@@ -303,9 +303,16 @@ public class SharePartitionManager implements 
AutoCloseable {
             futures.values().toArray(new CompletableFuture[0]));
         return allFutures.thenApply(v -> {
             Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
result = new HashMap<>();
-            futures.forEach((topicIdPartition, future) -> 
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
-                .setPartitionIndex(topicIdPartition.partition())
-                .setErrorCode(future.join().code())));
+            futures.forEach((topicIdPartition, future) -> {
+                ShareAcknowledgeResponseData.PartitionData partitionData = new 
ShareAcknowledgeResponseData.PartitionData()
+                    .setPartitionIndex(topicIdPartition.partition());
+                Throwable t = future.join();
+                if (t != null) {
+                    partitionData.setErrorCode(Errors.forException(t).code())
+                        .setErrorMessage(t.getMessage());
+                }
+                result.put(topicIdPartition, partitionData);
+            });
             return result;
         });
     }
@@ -342,22 +349,22 @@ public class SharePartitionManager implements 
AutoCloseable {
             return CompletableFuture.completedFuture(Collections.emptyMap());
         }
 
-        Map<TopicIdPartition, CompletableFuture<Errors>> futuresMap = new 
HashMap<>();
+        Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap = new 
HashMap<>();
         topicIdPartitions.forEach(topicIdPartition -> {
             SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, 
topicIdPartition);
             SharePartition sharePartition = 
partitionCacheMap.get(sharePartitionKey);
             if (sharePartition == null) {
                 log.error("No share partition found for groupId {} 
topicPartition {} while releasing acquired topic partitions", groupId, 
topicIdPartition);
-                futuresMap.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+                futuresMap.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
             } else {
-                CompletableFuture<Errors> future = new CompletableFuture<>();
+                CompletableFuture<Throwable> future = new 
CompletableFuture<>();
                 
sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, 
throwable) -> {
                     if (throwable != null) {
                         handleFencedSharePartitionException(sharePartitionKey, 
throwable);
-                        future.complete(Errors.forException(throwable));
+                        future.complete(throwable);
                         return;
                     }
-                    future.complete(Errors.NONE);
+                    future.complete(null);
                 });
                 // If we have a release acquired request completed for a 
topic-partition, then we should check if
                 // there is a pending share fetch request for the 
topic-partition and complete it.
@@ -372,9 +379,16 @@ public class SharePartitionManager implements 
AutoCloseable {
             futuresMap.values().toArray(new CompletableFuture[0]));
         return allFutures.thenApply(v -> {
             Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
result = new HashMap<>();
-            futuresMap.forEach((topicIdPartition, future) -> 
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
-                    .setPartitionIndex(topicIdPartition.partition())
-                    .setErrorCode(future.join().code())));
+            futuresMap.forEach((topicIdPartition, future) -> {
+                ShareAcknowledgeResponseData.PartitionData partitionData = new 
ShareAcknowledgeResponseData.PartitionData()
+                    .setPartitionIndex(topicIdPartition.partition());
+                Throwable t = future.join();
+                if (t != null) {
+                    partitionData.setErrorCode(Errors.forException(t).code())
+                        .setErrorMessage(t.getMessage());
+                }
+                result.put(topicIdPartition, partitionData);
+            });
             return result;
         });
     }
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 62680bd3c0c..8d3902f2410 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1328,9 +1328,11 @@ public class SharePartitionManagerTest {
         assertEquals(Errors.NONE.code(), result.get(tp1).errorCode());
         assertEquals(2, result.get(tp2).partitionIndex());
         assertEquals(Errors.INVALID_RECORD_STATE.code(), 
result.get(tp2).errorCode());
+        assertEquals("Unable to release acquired records for the batch", 
result.get(tp2).errorMessage());
         // tp3 was not a part of partitionCacheMap.
         assertEquals(4, result.get(tp3).partitionIndex());
         assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 
result.get(tp3).errorCode());
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), 
result.get(tp3).errorMessage());
     }
 
     @Test
@@ -1585,6 +1587,7 @@ public class SharePartitionManagerTest {
         assertTrue(result.containsKey(tp));
         assertEquals(0, result.get(tp).partitionIndex());
         assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 
result.get(tp).errorCode());
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), 
result.get(tp).errorMessage());
     }
 
     @Test
@@ -1615,6 +1618,7 @@ public class SharePartitionManagerTest {
         assertTrue(result.containsKey(tp));
         assertEquals(0, result.get(tp).partitionIndex());
         assertEquals(Errors.INVALID_REQUEST.code(), 
result.get(tp).errorCode());
+        assertEquals("Member is not the owner of batch record", 
result.get(tp).errorMessage());
     }
 
     @Test
@@ -1637,6 +1641,7 @@ public class SharePartitionManagerTest {
         assertTrue(result.containsKey(tp));
         assertEquals(3, result.get(tp).partitionIndex());
         assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 
result.get(tp).errorCode());
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), 
result.get(tp).errorMessage());
     }
 
     @Test

Reply via email to