This is an automated email from the ASF dual-hosted git repository.
apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d069b7cd166 MINOR: Cleanup DLQ record parameter. (#22480)
d069b7cd166 is described below
commit d069b7cd166913fa8c571c5e9101d2697d652eb0
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Jun 5 17:22:40 2026 +0530
MINOR: Cleanup DLQ record parameter. (#22480)
* Remove `preserveRecordData` attribute from
`ShareGroupDLQRecordParameter` as this information will be retrieved
from the share group dynamic config
`errors.deadletterqueue.copy.record.enable`.
Reviewers: Apoorv Mittal <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 2 +-
.../share/dlq/ShareGroupDLQRecordParameter.java | 16 +++++++--------
.../share/dlq/ShareGroupDLQStateManagerTest.java | 23 +++++++++++-----------
3 files changed, 19 insertions(+), 22 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index c8b4decd0e1..4661752c73f 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -3351,7 +3351,7 @@ public class SharePartition {
// Step 1: Enqueue to DLQ
shareGroupDLQManager.enqueue(new ShareGroupDLQRecordParameter(
groupId, topicIdPartition, firstOffset, lastOffset,
- Optional.of(deliveryCount), Optional.ofNullable(dlqCause), false
+ Optional.of(deliveryCount), Optional.ofNullable(dlqCause)
)).whenComplete((v1, dlqException) -> {
if (dlqException != null) {
log.error("Failed to write to DLQ, proceeding to ARCHIVED
regardless.", dlqException);
diff --git
a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
index 2e7c728542d..2e7383e4bbc 100644
---
a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
+++
b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
@@ -25,13 +25,12 @@ import java.util.Optional;
* Record representing information needed from callers of {@link
ShareGroupDLQManager#enqueue}. Inclusion
* of first and last offset allows passing batch information as well.
*
- * @param groupId The share group id of the message being recorded.
- * @param topicIdPartition The topic and partition information of the
message.
- * @param firstOffset The first offset of the records in the kafka
topic partition.
- * @param lastOffset The last offset of the records in the kafka topic
partition.
- * @param deliveryCount If known, the number of times the message was
delivered to the share consumer.
- * @param cause If known, throwable representing the reason for
queueing the message.
- * @param preserveRecordData If true, store original record headers, key and
value in the dlq record as well.
+ * @param groupId The share group id of the message being recorded.
+ * @param topicIdPartition The topic and partition information of the message.
+ * @param firstOffset The first offset of the records in the kafka topic
partition.
+ * @param lastOffset The last offset of the records in the kafka topic
partition.
+ * @param deliveryCount If known, the number of times the message was
delivered to the share consumer.
+ * @param cause If known, throwable representing the reason for
queueing the message.
*/
public record ShareGroupDLQRecordParameter(
String groupId,
@@ -39,7 +38,6 @@ public record ShareGroupDLQRecordParameter(
long firstOffset,
long lastOffset,
Optional<Short> deliveryCount,
- Optional<Throwable> cause,
- boolean preserveRecordData
+ Optional<Throwable> cause
) {
}
diff --git
a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
index eac929140cd..6eb5b659250 100644
---
a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
@@ -168,8 +168,7 @@ class ShareGroupDLQStateManagerTest {
0L,
2L,
Optional.of((short) 1),
- Optional.of(new RuntimeException("simulated cause")),
- false
+ Optional.of(new RuntimeException("simulated cause"))
);
}
@@ -972,12 +971,12 @@ class ShareGroupDLQStateManagerTest {
GROUP_ID,
new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false);
+ Optional.empty(), Optional.empty());
ShareGroupDLQRecordParameter p1 = new ShareGroupDLQRecordParameter(
GROUP_ID,
new TopicIdPartition(SOURCE_TOPIC_ID, 1, "source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false);
+ Optional.empty(), Optional.empty());
CompletableFuture<Void> r0 = stateManager.dlq(p0);
CompletableFuture<Void> r1 = stateManager.dlq(p1);
@@ -1043,7 +1042,7 @@ class ShareGroupDLQStateManagerTest {
GROUP_ID,
new TopicIdPartition(SOURCE_TOPIC_ID, 0, null),
0L, 0L,
- Optional.empty(), Optional.empty(), false);
+ Optional.empty(), Optional.empty());
assertNull(stateManager.dlq(p).get(10, TimeUnit.SECONDS));
assertEquals(1, capturedProduces.size());
@@ -1123,17 +1122,17 @@ class ShareGroupDLQStateManagerTest {
groupA,
new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false);
+ Optional.empty(), Optional.empty());
ShareGroupDLQRecordParameter pB = new ShareGroupDLQRecordParameter(
groupB,
new TopicIdPartition(SOURCE_TOPIC_ID, 1, "source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false);
+ Optional.empty(), Optional.empty());
ShareGroupDLQRecordParameter pC = new ShareGroupDLQRecordParameter(
groupC,
new TopicIdPartition(SOURCE_TOPIC_ID, 2, "source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false);
+ Optional.empty(), Optional.empty());
CompletableFuture<Void> rA = stateManager.dlq(pA);
CompletableFuture<Void> rB = stateManager.dlq(pB);
@@ -1206,7 +1205,7 @@ class ShareGroupDLQStateManagerTest {
GROUP_ID,
new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false));
+ Optional.empty(), Optional.empty()));
}
// Wait until the callback observes nodeRPCMap with more than 2
handlers piled up.
@@ -1352,7 +1351,7 @@ class ShareGroupDLQStateManagerTest {
new ShareGroupDLQRecordParameter(GROUP_ID,
new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false),
+ Optional.empty(), Optional.empty()),
goodFuture,
ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
@@ -1367,7 +1366,7 @@ class ShareGroupDLQStateManagerTest {
new ShareGroupDLQRecordParameter(GROUP_ID,
new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false),
+ Optional.empty(), Optional.empty()),
brokenFuture,
ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
@@ -1399,7 +1398,7 @@ class ShareGroupDLQStateManagerTest {
groupId,
new TopicIdPartition(SOURCE_TOPIC_ID, sourcePartition,
"source-topic"),
0L, 0L,
- Optional.empty(), Optional.empty(), false);
+ Optional.empty(), Optional.empty());
return manager.new ProduceRequestHandler(
param,
new CompletableFuture<>(),