This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 323eaaca53b KAFKA-20256: Add ShareGroupDLQ interface and some infra 
classes. (#21640)
323eaaca53b is described below

commit 323eaaca53bdc6849095f805cd9a5fc9939ccf04
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Mar 12 20:09:20 2026 +0530

    KAFKA-20256: Add ShareGroupDLQ interface and some infra classes. (#21640)
    
    * Add `DLQManager` interface (evolving).
    * Add a NoOp impl for above.
    * Add java record classes to server as arguments for
    `ShareGroupDLQ.enqueue`
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../server/share/dlq/NoOpShareGroupDLQManager.java | 32 +++++++++++++++
 .../kafka/server/share/dlq/ShareGroupDLQ.java      | 34 ++++++++++++++++
 .../share/dlq/ShareGroupDLQRecordParameter.java    | 45 ++++++++++++++++++++++
 3 files changed, 111 insertions(+)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
new file mode 100644
index 00000000000..e7cb9efc615
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A no op implementation of {@link ShareGroupDLQ}. This will be useful
+ * in development cycle and testing. All methods return immediately with
+ * a successfully completed future.
+ */
+public class NoOpShareGroupDLQManager implements ShareGroupDLQ {
+    @Override
+    public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param) 
{
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
new file mode 100644
index 00000000000..2dec3e914b7
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The main interface to identify implementations of dead letter queues for 
share groups.
+ */
+public interface ShareGroupDLQ {
+    /**
+     * Main method exposed to the world to enqueuing a record to the share 
groups dead letter queue.
+     *
+     * @param param A java record encapsulating required and optional 
information about the kafka record
+     *              being dead letter queued.
+     * @return A completable future of Void type, mainly to signal exceptions.
+     */
+    CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param);
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
new file mode 100644
index 00000000000..ba15f41b13f
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.apache.kafka.common.TopicIdPartition;
+
+import java.util.Optional;
+
+/**
+ * Record representing information needed from callers of {@link 
ShareGroupDLQ#enqueue}. Inclusion
+ * of first and last offset allows passing batch information as well.
+ *
+ * @param groupId            The share group id of the message being recorded.
+ * @param topicIdPartition   The topic and partition information of the 
message.
+ * @param firstOffset        The first offset of the records in the kafka 
topic partition.
+ * @param lastOffset         The last offset of the records in the kafka topic 
partition.
+ * @param deliveryCount      If known, the number of times the message was 
delivered to the share consumer.
+ * @param cause              If known, throwable representing the reason for 
queueing the message.
+ * @param preserveRecordData If true, store original record headers, key and 
value in the dlq record as well.
+ */
+public record ShareGroupDLQRecordParameter(
+    String groupId,
+    TopicIdPartition topicIdPartition,
+    long firstOffset,
+    long lastOffset,
+    Optional<Integer> deliveryCount,
+    Optional<Throwable> cause,
+    boolean preserveRecordData
+) {
+}

Reply via email to