This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 323eaaca53b KAFKA-20256: Add ShareGroupDLQ interface and some infra
classes. (#21640)
323eaaca53b is described below
commit 323eaaca53bdc6849095f805cd9a5fc9939ccf04
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Mar 12 20:09:20 2026 +0530
KAFKA-20256: Add ShareGroupDLQ interface and some infra classes. (#21640)
* Add `DLQManager` interface (evolving).
* Add a NoOp impl for above.
* Add java record classes to server as arguments for
`ShareGroupDLQ.enqueue`
Reviewers: Andrew Schofield <[email protected]>
---
.../server/share/dlq/NoOpShareGroupDLQManager.java | 32 +++++++++++++++
.../kafka/server/share/dlq/ShareGroupDLQ.java | 34 ++++++++++++++++
.../share/dlq/ShareGroupDLQRecordParameter.java | 45 ++++++++++++++++++++++
3 files changed, 111 insertions(+)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
new file mode 100644
index 00000000000..e7cb9efc615
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A no op implementation of {@link ShareGroupDLQ}. This will be useful
+ * in development cycle and testing. All methods return immediately with
+ * a successfully completed future.
+ */
+public class NoOpShareGroupDLQManager implements ShareGroupDLQ {
+ @Override
+ public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param)
{
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
new file mode 100644
index 00000000000..2dec3e914b7
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The main interface to identify implementations of dead letter queues for
share groups.
+ */
+public interface ShareGroupDLQ {
+ /**
+ * Main method exposed to the world to enqueuing a record to the share
groups dead letter queue.
+ *
+ * @param param A java record encapsulating required and optional
information about the kafka record
+ * being dead letter queued.
+ * @return A completable future of Void type, mainly to signal exceptions.
+ */
+ CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param);
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
new file mode 100644
index 00000000000..ba15f41b13f
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.apache.kafka.common.TopicIdPartition;
+
+import java.util.Optional;
+
+/**
+ * Record representing information needed from callers of {@link
ShareGroupDLQ#enqueue}. Inclusion
+ * of first and last offset allows passing batch information as well.
+ *
+ * @param groupId The share group id of the message being recorded.
+ * @param topicIdPartition The topic and partition information of the
message.
+ * @param firstOffset The first offset of the records in the kafka
topic partition.
+ * @param lastOffset The last offset of the records in the kafka topic
partition.
+ * @param deliveryCount If known, the number of times the message was
delivered to the share consumer.
+ * @param cause If known, throwable representing the reason for
queueing the message.
+ * @param preserveRecordData If true, store original record headers, key and
value in the dlq record as well.
+ */
+public record ShareGroupDLQRecordParameter(
+ String groupId,
+ TopicIdPartition topicIdPartition,
+ long firstOffset,
+ long lastOffset,
+ Optional<Integer> deliveryCount,
+ Optional<Throwable> cause,
+ boolean preserveRecordData
+) {
+}