This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fa54065298d KAFKA-18086: Enable propagation of the error message when
writing state (#17980)
fa54065298d is described below
commit fa54065298d1f3f62aa1e3563722fb58ac6f491d
Author: Yung <[email protected]>
AuthorDate: Fri Dec 6 01:48:26 2024 +0800
KAFKA-18086: Enable propagation of the error message when writing state
(#17980)
* KAFKA-18086: Enable propagation of the error message when writing state
* Propagate the error message in the writing state when calling
SharePartitionManager.acknowledge and SharePartitionManager.releaseSession, and
add corresponding tests to verify that the expected error message is propagated.
* Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../kafka/server/share/SharePartitionManager.java | 46 ++++++++++++++--------
.../server/share/SharePartitionManagerTest.java | 5 +++
2 files changed, 35 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index ac96fe348b3..a256b28a736 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -272,20 +272,20 @@ public class SharePartitionManager implements
AutoCloseable {
log.trace("Acknowledge request for topicIdPartitions: {} with groupId:
{}",
acknowledgeTopics.keySet(), groupId);
this.shareGroupMetrics.shareAcknowledgement();
- Map<TopicIdPartition, CompletableFuture<Errors>> futures = new
HashMap<>();
+ Map<TopicIdPartition, CompletableFuture<Throwable>> futures = new
HashMap<>();
acknowledgeTopics.forEach((topicIdPartition,
acknowledgePartitionBatches) -> {
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId,
topicIdPartition);
SharePartition sharePartition =
partitionCacheMap.get(sharePartitionKey);
if (sharePartition != null) {
- CompletableFuture<Errors> future = new CompletableFuture<>();
+ CompletableFuture<Throwable> future = new
CompletableFuture<>();
sharePartition.acknowledge(memberId,
acknowledgePartitionBatches).whenComplete((result, throwable) -> {
if (throwable != null) {
handleFencedSharePartitionException(sharePartitionKey,
throwable);
- future.complete(Errors.forException(throwable));
+ future.complete(throwable);
return;
}
acknowledgePartitionBatches.forEach(batch ->
batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement));
- future.complete(Errors.NONE);
+ future.complete(null);
});
// If we have an acknowledgement completed for a
topic-partition, then we should check if
@@ -295,7 +295,7 @@ public class SharePartitionManager implements AutoCloseable
{
futures.put(topicIdPartition, future);
} else {
- futures.put(topicIdPartition,
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+ futures.put(topicIdPartition,
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
}
});
@@ -303,9 +303,16 @@ public class SharePartitionManager implements
AutoCloseable {
futures.values().toArray(new CompletableFuture[0]));
return allFutures.thenApply(v -> {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
result = new HashMap<>();
- futures.forEach((topicIdPartition, future) ->
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
- .setPartitionIndex(topicIdPartition.partition())
- .setErrorCode(future.join().code())));
+ futures.forEach((topicIdPartition, future) -> {
+ ShareAcknowledgeResponseData.PartitionData partitionData = new
ShareAcknowledgeResponseData.PartitionData()
+ .setPartitionIndex(topicIdPartition.partition());
+ Throwable t = future.join();
+ if (t != null) {
+ partitionData.setErrorCode(Errors.forException(t).code())
+ .setErrorMessage(t.getMessage());
+ }
+ result.put(topicIdPartition, partitionData);
+ });
return result;
});
}
@@ -342,22 +349,22 @@ public class SharePartitionManager implements
AutoCloseable {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
- Map<TopicIdPartition, CompletableFuture<Errors>> futuresMap = new
HashMap<>();
+ Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap = new
HashMap<>();
topicIdPartitions.forEach(topicIdPartition -> {
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId,
topicIdPartition);
SharePartition sharePartition =
partitionCacheMap.get(sharePartitionKey);
if (sharePartition == null) {
log.error("No share partition found for groupId {}
topicPartition {} while releasing acquired topic partitions", groupId,
topicIdPartition);
- futuresMap.put(topicIdPartition,
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+ futuresMap.put(topicIdPartition,
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
} else {
- CompletableFuture<Errors> future = new CompletableFuture<>();
+ CompletableFuture<Throwable> future = new
CompletableFuture<>();
sharePartition.releaseAcquiredRecords(memberId).whenComplete((result,
throwable) -> {
if (throwable != null) {
handleFencedSharePartitionException(sharePartitionKey,
throwable);
- future.complete(Errors.forException(throwable));
+ future.complete(throwable);
return;
}
- future.complete(Errors.NONE);
+ future.complete(null);
});
// If we have a release acquired request completed for a
topic-partition, then we should check if
// there is a pending share fetch request for the
topic-partition and complete it.
@@ -372,9 +379,16 @@ public class SharePartitionManager implements
AutoCloseable {
futuresMap.values().toArray(new CompletableFuture[0]));
return allFutures.thenApply(v -> {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
result = new HashMap<>();
- futuresMap.forEach((topicIdPartition, future) ->
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
- .setPartitionIndex(topicIdPartition.partition())
- .setErrorCode(future.join().code())));
+ futuresMap.forEach((topicIdPartition, future) -> {
+ ShareAcknowledgeResponseData.PartitionData partitionData = new
ShareAcknowledgeResponseData.PartitionData()
+ .setPartitionIndex(topicIdPartition.partition());
+ Throwable t = future.join();
+ if (t != null) {
+ partitionData.setErrorCode(Errors.forException(t).code())
+ .setErrorMessage(t.getMessage());
+ }
+ result.put(topicIdPartition, partitionData);
+ });
return result;
});
}
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 62680bd3c0c..8d3902f2410 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1328,9 +1328,11 @@ public class SharePartitionManagerTest {
assertEquals(Errors.NONE.code(), result.get(tp1).errorCode());
assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.INVALID_RECORD_STATE.code(),
result.get(tp2).errorCode());
+ assertEquals("Unable to release acquired records for the batch",
result.get(tp2).errorMessage());
// tp3 was not a part of partitionCacheMap.
assertEquals(4, result.get(tp3).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
result.get(tp3).errorCode());
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(),
result.get(tp3).errorMessage());
}
@Test
@@ -1585,6 +1587,7 @@ public class SharePartitionManagerTest {
assertTrue(result.containsKey(tp));
assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
result.get(tp).errorCode());
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(),
result.get(tp).errorMessage());
}
@Test
@@ -1615,6 +1618,7 @@ public class SharePartitionManagerTest {
assertTrue(result.containsKey(tp));
assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(),
result.get(tp).errorCode());
+ assertEquals("Member is not the owner of batch record",
result.get(tp).errorMessage());
}
@Test
@@ -1637,6 +1641,7 @@ public class SharePartitionManagerTest {
assertTrue(result.containsKey(tp));
assertEquals(3, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
result.get(tp).errorCode());
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(),
result.get(tp).errorMessage());
}
@Test