This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d1e82797919 KAFKA-20549: Implemented PRODUCE RPC handler for DLQ state
manager. [5/N] (#22368)
d1e82797919 is described below
commit d1e82797919887e480cbda3d9fb8ba7b4dde0be8
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed May 27 22:10:24 2026 +0530
KAFKA-20549: Implemented PRODUCE RPC handler for DLQ state manager. [5/N]
(#22368)
* Produce RPC impl.
* Batching and coalescing added.
* Unit tests for `ShareGroupDLQStateManager`.
Reviewers: Andrew Schofield <[email protected]>
---
.../ShareCoordinatorMetadataCacheHelperImpl.java | 48 +-
...hareCoordinatorMetadataCacheHelperImplTest.java | 178 +++-
.../dlq/ShareGroupDLQMetadataCacheHelper.java | 38 +-
.../share/dlq/ShareGroupDLQStateManager.java | 433 ++++++++-
.../share/dlq/ShareGroupDLQStateManagerTest.java | 972 +++++++++++++++++++++
5 files changed, 1627 insertions(+), 42 deletions(-)
diff --git
a/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
index ad556514891..6e0667cc6a9 100644
---
a/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
+++
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
@@ -18,6 +18,8 @@
package kafka.server.share;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
@@ -29,10 +31,12 @@ import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper;
import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
+import org.apache.kafka.storage.internals.log.LogConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@@ -89,14 +93,14 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
@Override
public boolean isDlqEnabledOnTopic(String topic) {
Properties props = metadataCache.topicConfig(topic);
- if (props == null || props.isEmpty()) {
+ if (props == null) {
return false;
}
- Object isEnabled =
props.get(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
- if (isEnabled instanceof Boolean) {
- return (boolean) isEnabled;
+ try {
+ return new
LogConfig(props).getBoolean(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
+ } catch (ConfigException exe) {
+ return false;
}
- return false;
}
@Override
@@ -137,7 +141,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
}
}
} catch (Exception e) {
- log.warn("Exception while getting share coordinator", e);
+ log.warn("Exception while getting share coordinator.", e);
}
return Node.noNode();
}
@@ -147,8 +151,38 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
try {
return metadataCache.getAliveBrokerNodes(interBrokerListenerName);
} catch (Exception e) {
- log.warn("Exception while getting cluster nodes", e);
+ log.warn("Exception while getting cluster nodes.", e);
}
return List.of();
}
+
+ @Override
+ public Optional<String> topicName(Uuid topicId) {
+ try {
+ return metadataCache.getTopicName(topicId);
+ } catch (Exception e) {
+ log.warn("Exception while fetching topic name.", e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public TopicPartitionData topicPartitionData(String topicName) {
+ Uuid topicId = metadataCache.getTopicId(topicName);
+ Optional<Integer> numPartitions =
metadataCache.numPartitions(topicName);
+ List<Node> partitionLeaders = new ArrayList<>();
+
+ if (numPartitions.isPresent()) {
+ for (int i = 0; i < numPartitions.get(); i++) {
+
partitionLeaders.add(metadataCache.getPartitionLeaderEndpoint(topicName, i,
interBrokerListenerName).orElse(null));
+ }
+ }
+
+ return new TopicPartitionData(
+ topicName,
+ numPartitions,
+ Optional.ofNullable(topicId == Uuid.ZERO_UUID ? null : topicId),
+ partitionLeaders
+ );
+ }
}
diff --git
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
index 69e8e235dd2..713a4d78909 100644
---
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
+++
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
@@ -29,10 +29,12 @@ import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper;
import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -667,7 +669,7 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
}
@Test
- public void testIsDlqEnabledOnTopicReturnsFalseWhenConfigValueNotBoolean()
{
+ public void testIsDlqEnabledOnTopicReturnsTrue() {
MetadataCache mockMetadataCache = mock(MetadataCache.class);
Properties props = new Properties();
props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG,
"true");
@@ -680,14 +682,14 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
mock(GroupConfigManager.class)
);
- assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+ assertTrue(cache.isDlqEnabledOnTopic("test-topic"));
}
@Test
- public void testIsDlqEnabledOnTopicReturnsTrue() {
+ public void testIsDlqEnabledOnTopicReturnsFalse() {
MetadataCache mockMetadataCache = mock(MetadataCache.class);
Properties props = new Properties();
- props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG,
true);
+ props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG,
false);
when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
@@ -697,15 +699,16 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
mock(GroupConfigManager.class)
);
- assertTrue(cache.isDlqEnabledOnTopic("test-topic"));
+ assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
}
+ // Tests for topicName
+
@Test
- public void testIsDlqEnabledOnTopicReturnsFalseValue() {
+ public void testTopicNameReturnsNameWhenPresent() {
MetadataCache mockMetadataCache = mock(MetadataCache.class);
- Properties props = new Properties();
- props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG,
false);
- when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
+ Uuid topicId = Uuid.randomUuid();
+
when(mockMetadataCache.getTopicName(topicId)).thenReturn(Optional.of("some-topic"));
ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
@@ -714,6 +717,161 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
mock(GroupConfigManager.class)
);
- assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+ assertEquals(Optional.of("some-topic"), cache.topicName(topicId));
+ verify(mockMetadataCache, times(1)).getTopicName(topicId);
+ }
+
+ @Test
+ public void testTopicNameReturnsEmptyWhenNotPresent() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ Uuid topicId = Uuid.randomUuid();
+
when(mockMetadataCache.getTopicName(topicId)).thenReturn(Optional.empty());
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
+ );
+
+ assertEquals(Optional.empty(), cache.topicName(topicId));
+ verify(mockMetadataCache, times(1)).getTopicName(topicId);
+ }
+
+ @Test
+ public void testTopicNameReturnsEmptyOnException() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ Uuid topicId = Uuid.randomUuid();
+ when(mockMetadataCache.getTopicName(topicId)).thenThrow(new
RuntimeException("boom"));
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
+ );
+
+ assertEquals(Optional.empty(), cache.topicName(topicId));
+ verify(mockMetadataCache, times(1)).getTopicName(topicId);
+ }
+
+ // Tests for topicPartitionData
+
+ @Test
+ public void testTopicPartitionDataReturnsFullData() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+ Uuid topicId = Uuid.randomUuid();
+ Node leader0 = new Node(0, "host0", 9092);
+ Node leader1 = new Node(1, "host1", 9092);
+
+ when(mockMetadataCache.getTopicId("test-topic")).thenReturn(topicId);
+
when(mockMetadataCache.numPartitions("test-topic")).thenReturn(Optional.of(2));
+ when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 0,
mockListenerName))
+ .thenReturn(Optional.of(leader0));
+ when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 1,
mockListenerName))
+ .thenReturn(Optional.of(leader1));
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mockListenerName,
+ mock(GroupConfigManager.class)
+ );
+
+ ShareGroupDLQMetadataCacheHelper.TopicPartitionData data =
cache.topicPartitionData("test-topic");
+
+ assertEquals("test-topic", data.topicName());
+ assertEquals(Optional.of(2), data.numPartitions());
+ assertEquals(Optional.of(topicId), data.topicId());
+ assertEquals(List.of(leader0, leader1), data.partitionLeaderNodes());
+
+ verify(mockMetadataCache, times(1)).getTopicId("test-topic");
+ verify(mockMetadataCache, times(1)).numPartitions("test-topic");
+ verify(mockMetadataCache,
times(1)).getPartitionLeaderEndpoint("test-topic", 0, mockListenerName);
+ verify(mockMetadataCache,
times(1)).getPartitionLeaderEndpoint("test-topic", 1, mockListenerName);
+ }
+
+ @Test
+ public void testTopicPartitionDataReturnsEmptyTopicIdWhenZeroUuid() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+ Node leader0 = new Node(0, "host0", 9092);
+
+
when(mockMetadataCache.getTopicId("test-topic")).thenReturn(Uuid.ZERO_UUID);
+
when(mockMetadataCache.numPartitions("test-topic")).thenReturn(Optional.of(1));
+ when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 0,
mockListenerName))
+ .thenReturn(Optional.of(leader0));
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mockListenerName,
+ mock(GroupConfigManager.class)
+ );
+
+ ShareGroupDLQMetadataCacheHelper.TopicPartitionData data =
cache.topicPartitionData("test-topic");
+
+ assertEquals("test-topic", data.topicName());
+ assertEquals(Optional.of(1), data.numPartitions());
+ assertEquals(Optional.empty(), data.topicId());
+ assertEquals(List.of(leader0), data.partitionLeaderNodes());
+ }
+
+ @Test
+ public void testTopicPartitionDataWithoutNumPartitions() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+ Uuid topicId = Uuid.randomUuid();
+
+ when(mockMetadataCache.getTopicId("test-topic")).thenReturn(topicId);
+
when(mockMetadataCache.numPartitions("test-topic")).thenReturn(Optional.empty());
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mockListenerName,
+ mock(GroupConfigManager.class)
+ );
+
+ ShareGroupDLQMetadataCacheHelper.TopicPartitionData data =
cache.topicPartitionData("test-topic");
+
+ assertEquals("test-topic", data.topicName());
+ assertEquals(Optional.empty(), data.numPartitions());
+ assertEquals(Optional.of(topicId), data.topicId());
+ assertEquals(List.of(), data.partitionLeaderNodes());
+
+ verify(mockMetadataCache, times(1)).getTopicId("test-topic");
+ verify(mockMetadataCache, times(1)).numPartitions("test-topic");
+ verify(mockMetadataCache, times(0)).getPartitionLeaderEndpoint(any(),
any(Integer.class), any());
+ }
+
+ @Test
+ public void testTopicPartitionDataWithMissingPartitionLeader() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ ListenerName mockListenerName = mock(ListenerName.class);
+ Uuid topicId = Uuid.randomUuid();
+ Node leader0 = new Node(0, "host0", 9092);
+
+ when(mockMetadataCache.getTopicId("test-topic")).thenReturn(topicId);
+
when(mockMetadataCache.numPartitions("test-topic")).thenReturn(Optional.of(2));
+ when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 0,
mockListenerName))
+ .thenReturn(Optional.of(leader0));
+ when(mockMetadataCache.getPartitionLeaderEndpoint("test-topic", 1,
mockListenerName))
+ .thenReturn(Optional.empty());
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mockListenerName,
+ mock(GroupConfigManager.class)
+ );
+
+ ShareGroupDLQMetadataCacheHelper.TopicPartitionData data =
cache.topicPartitionData("test-topic");
+
+ assertEquals("test-topic", data.topicName());
+ assertEquals(Optional.of(2), data.numPartitions());
+ assertEquals(Optional.of(topicId), data.topicId());
+ assertEquals(Arrays.asList(leader0, null),
data.partitionLeaderNodes());
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
index e670676831f..bbc3949cbdc 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
@@ -18,6 +18,7 @@
package org.apache.kafka.server.share.dlq;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
import java.util.List;
import java.util.Optional;
@@ -28,11 +29,20 @@ import java.util.Optional;
* and keeping implementations manageable.
*/
public interface ShareGroupDLQMetadataCacheHelper {
+
+ public record TopicPartitionData(
+ String topicName,
+ Optional<Integer> numPartitions,
+ Optional<Uuid> topicId,
+ List<Node> partitionLeaderNodes
+ ) {
+ }
+
/**
* Return optional of string representing of DLQ topic.
*
* @param groupId Id of the share group
- * @return Optional of string representing of DLQ topic if set, empty
otherwise
+ * @return Optional of string representing of DLQ topic if set, empty
otherwise
*/
Optional<String> shareGroupDlqTopic(String groupId);
@@ -40,7 +50,7 @@ public interface ShareGroupDLQMetadataCacheHelper {
* Check if DLQ dynamic config is set on the topic to mark it available
for DLQ writes.
*
* @param topic The name of the topic
- * @return Boolean which is true when DLQ is set on the topic, false
otherwise
+ * @return Boolean which is true when DLQ is set on the topic, false
otherwise
*/
boolean isDlqEnabledOnTopic(String topic);
@@ -54,7 +64,7 @@ public interface ShareGroupDLQMetadataCacheHelper {
/**
* Return optional of string representing the configured DLQ prefix.
*
- * @return Optional of string representing DLQ prefix if configured,
empty otherwise
+ * @return Optional of string representing DLQ prefix if configured, empty
otherwise
*/
Optional<String> shareGroupDlqTopicPrefix();
@@ -62,7 +72,7 @@ public interface ShareGroupDLQMetadataCacheHelper {
* Check is copy record data into DLQ topic is enabled.
*
* @param groupId The id of the share group
- * @return Boolean which is true if config is set, false otherwise
+ * @return Boolean which is true if config is set, false otherwise
*/
boolean isShareGroupDlqCopyRecordEnabled(String groupId);
@@ -70,14 +80,30 @@ public interface ShareGroupDLQMetadataCacheHelper {
* Check if a topic is present in the metadata cache.
*
* @param topic The name of the topic
- * @return Boolean which is true is topic exists, false otherwise
+ * @return Boolean which is true is topic exists, false otherwise
*/
boolean containsTopic(String topic);
/**
* Get all nodes in the kafka cluster encapsulated in the {@link Node}
object.
*
- * @return List of nodes representing the cluster nodes
+ * @return List of nodes representing the cluster nodes
*/
List<Node> getClusterNodes();
+
+ /**
+ * Fetch topic name, based on the topic id.
+ *
+ * @param topicId The uuid of the topic
+ * @return Optional specifying the name, or empty in case of error/not
found.
+ */
+ Optional<String> topicName(Uuid topicId);
+
+ /**
+ * Fetch topic partition data, based on the topic name.
+ *
+ * @param topicName The name of the topic
+ * @return TopicPartitionData java record specifying the information.
+ */
+ TopicPartitionData topicPartitionData(String topicName);
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
index 1bcfd1f025d..90f880ee08d 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -22,17 +22,29 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
+import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.apache.kafka.server.util.timer.Timer;
@@ -41,10 +53,17 @@ import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,6 +87,10 @@ public class ShareGroupDLQStateManager {
private static final double RETRY_BACKOFF_JITTER =
CommonClientConfigs.RETRY_BACKOFF_JITTER;
private static final Logger log =
LoggerFactory.getLogger(ShareGroupDLQStateManager.class);
+ private final Set<Node> inFlight = new HashSet<>();
+ private final Map<Node, List<ProduceRequestHandler>> nodeRPCMap = new
HashMap<>();
+ private final Object nodeMapLock = new Object();
+
public ShareGroupDLQStateManager(KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
if (client == null) {
throw new IllegalArgumentException("Kafkaclient must not be
null.");
@@ -120,8 +143,13 @@ public class ShareGroupDLQStateManager {
* @return A future completing normally on successful DLQ, exceptionally
otherwise.
*/
public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
+ return dlq(param, REQUEST_BACKOFF_MS, REQUEST_BACKOFF_MAX_MS,
MAX_REQUEST_ATTEMPTS);
+ }
+
+ // Visibility for tests
+ CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param, long
requestBackoffMs, long requestBackoffMaxMs, int maxRequestAttempts) {
CompletableFuture<Void> future = new CompletableFuture<>();
- ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param, future, REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS, MAX_REQUEST_ATTEMPTS);
+ ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param, future, requestBackoffMs, requestBackoffMaxMs,
maxRequestAttempts);
enqueue(requestHandler);
return future;
}
@@ -130,11 +158,46 @@ public class ShareGroupDLQStateManager {
sender.enqueue(requestHandler);
}
- private class ProduceRequestHandler implements RequestCompletionHandler {
+ /**
+ * Add a produce request handler after determining that the DLQ topic
exists
+ * or has been created by he CREATE_TOPIC RPC. The map is used to collect
all PRODUCE
+ * requests which are destined for a specific destination node. The Sender
class
+ * then performs coalescing on all the handlers to create one single
PRODUCE instead
+ * of sending multiple RPCs. This method is currently called when a DLQ
topic already
+ * exists and there is no need to send a CREATE_TOPIC RPC and if it does
not, post
+ * successful DLQ topic creation.
+ *
+ * @param node The destination node where the produce request needs to
be sent.
+ * @param handler The handler instance to add to the node map.
+ */
+ private void addRequestToNodeMap(Node node, ProduceRequestHandler handler)
{
+ if (!handler.isBatchable()) {
+ return;
+ }
+ synchronized (nodeMapLock) {
+ nodeRPCMap.computeIfAbsent(node, k -> new LinkedList<>())
+ .add(handler);
+ }
+ sender.wakeup();
+ }
+
+ // Visibility for tests
+ class ProduceRequestHandler implements RequestCompletionHandler {
private final CompletableFuture<Void> result;
private final ShareGroupDLQRecordParameter param;
private static final Logger LOG =
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
private final ExponentialBackoffManager createTopicsBackoff;
+ private final ExponentialBackoffManager produceRequestBackoff;
+ private Node dlqPartitionLeaderNode;
+ private int dlqDestinationPartition;
+ private ShareGroupDLQMetadataCacheHelper.TopicPartitionData
dlqTopicPartitionData;
+
+ public static final String HEADER_DLQ_ERRORS_TOPIC =
"__dlq.errors.topic";
+ public static final String HEADER_DLQ_ERRORS_PARTITION =
"__dlq.errors.partition";
+ public static final String HEADER_DLQ_ERRORS_OFFSET =
"__dlq.errors.offset";
+ public static final String HEADER_DLQ_ERRORS_GROUP =
"__dlq.errors.group";
+ public static final String HEADER_DLQ_ERRORS_DELIVERY_COUNT =
"__dlq.errors.delivery.count";
+ public static final String HEADER_DLQ_ERRORS_MESSAGE =
"__dlq.errors.message";
public ProduceRequestHandler(
ShareGroupDLQRecordParameter param,
@@ -152,6 +215,13 @@ public class ShareGroupDLQStateManager {
backoffMaxMs,
RETRY_BACKOFF_JITTER
);
+ this.produceRequestBackoff = new ExponentialBackoffManager(
+ maxRPCRetryAttempts,
+ backoffMs,
+ RETRY_BACKOFF_EXP_BASE,
+ backoffMaxMs,
+ RETRY_BACKOFF_JITTER
+ );
}
@Override
@@ -165,8 +235,8 @@ public class ShareGroupDLQStateManager {
if (response.requestHeader().apiKey() == ApiKeys.CREATE_TOPICS) {
handleCreateTopicsResponse(response);
- } else {
- // handle the response
+ } else if (response.requestHeader().apiKey() == ApiKeys.PRODUCE) {
+ handleProduceResponse(response);
}
sender.wakeup();
@@ -176,6 +246,19 @@ public class ShareGroupDLQStateManager {
return "ProduceRequestHandler";
}
+ /**
+ * This method helps determine if the handler could
+ * participate in batching (added to nodeMap). This will
+ * be helpful if the RPCs which cannot be batched are included in
+ * this class as well.
+ *
+ * @return Boolean indicating whether this handler can be coalesced
with others
+ * to reduce number of RPCs sent.
+ */
+ boolean isBatchable() {
+ return true;
+ }
+
public void requestErrorResponse(Throwable exception) {
this.result.completeExceptionally(exception);
}
@@ -204,6 +287,66 @@ public class ShareGroupDLQStateManager {
.setTopics(topicCollection));
}
+ public AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder() {
+ throw new RuntimeException("Produce requests are batchable, hence
individual requests not needed.");
+ }
+
+ public void populateDLQTopicData() throws ConfigException {
+ Optional<String> dlqTopic =
cacheHelper.shareGroupDlqTopic(param.groupId());
+ if (dlqTopic.isEmpty()) {
+ throw new ConfigException(String.format("DLQ topic is not
configured for share group %s.", param.groupId()));
+ }
+
+ ShareGroupDLQMetadataCacheHelper.TopicPartitionData tpData =
cacheHelper.topicPartitionData(dlqTopic.get());
+
+ if (tpData.topicId().isEmpty()) {
+ throw new ConfigException(String.format("DLQ topic id could
not be found for share group %s with DLQ topic %s.", param.groupId(),
dlqTopic.get()));
+ }
+
+ if (tpData.numPartitions().isEmpty()) {
+ throw new ConfigException(String.format("DLQ topic partition
count could not be found for share group %s with DLQ topic %s.",
param.groupId(), dlqTopic.get()));
+ }
+
+ if (tpData.partitionLeaderNodes().isEmpty() ||
tpData.partitionLeaderNodes().size() != tpData.numPartitions().get()) {
+ throw new ConfigException(String.format("DLQ topic partition
leaders for share group %s with DLQ topic %s could not be found.",
param.groupId(), dlqTopic.get()));
+ }
+
+ this.dlqDestinationPartition =
param.topicIdPartition().partition() % tpData.numPartitions().get();
+ this.dlqPartitionLeaderNode =
tpData.partitionLeaderNodes().get(dlqDestinationPartition);
+
+ if (this.dlqPartitionLeaderNode == null ||
this.dlqPartitionLeaderNode.equals(Node.noNode())) {
+ throw new ConfigException(String.format("DLQ topic partition
leader node for share group %s with DLQ topic %s and partition %d could not be
found.", param.groupId(), dlqTopic.get(), dlqDestinationPartition));
+ }
+
+ this.dlqTopicPartitionData = tpData;
+ }
+
+ public ProduceRequestData.TopicProduceData topicProduceData() {
+ List<SimpleRecord> simpleRecords = new ArrayList<>();
+ for (long i = param.firstOffset(); i <= param.lastOffset(); i++) {
+ long timestamp = time.hiResClockMs();
+ simpleRecords.add(new SimpleRecord(timestamp, (byte[]) null,
null, headers(i)));
+ }
+
+ MemoryRecords records = MemoryRecords.withRecords(
+ Compression.NONE,
+ simpleRecords.toArray(new SimpleRecord[]{})
+ );
+
+ return new ProduceRequestData.TopicProduceData()
+ .setName(dlqTopicPartitionData.topicName())
+ .setTopicId(dlqTopicPartitionData.topicId().get())
+ .setPartitionData(List.of(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(dlqDestinationPartition) // partition
+ .setRecords(records)
+ ));
+ }
+
+ public Node dlqPartitionLeaderNode() {
+ return this.dlqPartitionLeaderNode;
+ }
+
public Optional<Throwable> validateDlqTopic() {
Optional<String> topicNameOpt =
cacheHelper.shareGroupDlqTopic(param.groupId());
Optional<String> topicPrefix =
cacheHelper.shareGroupDlqTopicPrefix();
@@ -238,7 +381,51 @@ public class ShareGroupDLQStateManager {
public boolean dlqTopicExists() {
Optional<String> shareGroupDlqTopic =
cacheHelper.shareGroupDlqTopic(param.groupId());
- return
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+ boolean isDlqTopicPresent =
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+ if (isDlqTopicPresent) {
+ try {
+ populateDLQTopicData();
+ } catch (ConfigException e) {
+ return false;
+ }
+ addRequestToNodeMap(dlqPartitionLeaderNode, this);
+ }
+ return isDlqTopicPresent;
+ }
+
+ @Override
+ public String toString() {
+ return "ProduceRequestHandler(" +
+ "param: " + param + "\n" +
+ "dlqTopicData: " + dlqTopicPartitionData + "\n" +
+ ")";
+ }
+
+ private Header[] headers(long offset) {
+ List<Header> headers = new ArrayList<>();
+ headers.add(new RecordHeader(HEADER_DLQ_ERRORS_TOPIC,
recordTopic().getBytes(StandardCharsets.UTF_8)));
+ headers.add(new RecordHeader(HEADER_DLQ_ERRORS_PARTITION,
Integer.toString(param.topicIdPartition().partition()).getBytes(StandardCharsets.UTF_8)));
+ headers.add(new RecordHeader(HEADER_DLQ_ERRORS_OFFSET,
Long.toString(offset).getBytes(StandardCharsets.UTF_8)));
+ headers.add(new RecordHeader(HEADER_DLQ_ERRORS_GROUP,
param.groupId().getBytes(StandardCharsets.UTF_8)));
+ param.deliveryCount().ifPresent(deliveryCount -> headers.add(
+ new RecordHeader(HEADER_DLQ_ERRORS_DELIVERY_COUNT,
Short.toString(deliveryCount).getBytes(StandardCharsets.UTF_8))));
+ param.cause().ifPresent(cause -> {
+ if (cause.getMessage() != null) {
+ headers.add(new RecordHeader(HEADER_DLQ_ERRORS_MESSAGE,
cause.getMessage().getBytes(StandardCharsets.UTF_8)));
+ }
+ });
+
+ return headers.toArray(new Header[0]);
+ }
+
+ private String recordTopic() {
+ TopicIdPartition topicIdPartition = param.topicIdPartition();
+ String recordTopicName = param.topicIdPartition().topic();
+ if (recordTopicName == null || recordTopicName.isEmpty()) {
+ // If topic name lookup fails, use topic id as a String in the
header.
+ recordTopicName =
cacheHelper.topicName(param.topicIdPartition().topicId()).orElse(topicIdPartition.topicId().toString());
+ }
+ return recordTopicName;
}
// Visibility for testing
@@ -247,22 +434,22 @@ public class ShareGroupDLQStateManager {
return Optional.empty();
}
- String dlqTopicName =
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
+ String dlqTopicName =
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("<UNKNOWN>");
- LOG.debug("Response for RPC {} with DLQ topic {} is invalid - {}",
name(), dlqTopicName, response);
+ LOG.debug("Response for RPC for handler {} with DLQ topic {} is
invalid - {}.", this, dlqTopicName, response);
if (response.authenticationException() != null) {
- LOG.error("Authentication exception",
response.authenticationException());
+ LOG.error("Authentication exception.",
response.authenticationException());
Errors error =
Errors.forException(response.authenticationException());
return Optional.of(error);
} else if (response.versionMismatch() != null) {
- LOG.error("Version mismatch exception",
response.versionMismatch());
+ LOG.error("Version mismatch exception.",
response.versionMismatch());
Errors error = Errors.forException(response.versionMismatch());
return Optional.of(error);
} else if (response.wasDisconnected()) { // Retriable
return Optional.of(Errors.NETWORK_EXCEPTION);
} else if (response.wasTimedOut()) { // Retriable
- LOG.debug("Response for RPC {} with DLQ topic {} timed out -
{}.", name(), dlqTopicName, response);
+ LOG.debug("Response for RPC for handler {} with DLQ topic {}
timed out - {}.", this, dlqTopicName, response);
return Optional.of(Errors.REQUEST_TIMED_OUT);
} else {
return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
@@ -270,11 +457,11 @@ public class ShareGroupDLQStateManager {
}
private void handleCreateTopicsResponse(ClientResponse response) {
- LOG.debug("Received CreateTopicsResponse {}", response);
+ LOG.debug("Received CreateTopicsResponse {}.", response);
createTopicsBackoff.incrementAttempt();
Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
String clientResponseErrorMessage = clientResponseError.message();
- String dlqTopicName =
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
+ String dlqTopicName =
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("<UNKNOWN>");
switch (clientResponseError) {
case NONE:
@@ -292,8 +479,23 @@ public class ShareGroupDLQStateManager {
String errorMessage = topicResult.errorMessage();
switch (error) {
case NONE:
- // Replace with enqueue post PRODUCE implementation
- this.result.complete(null);
+ try {
+ populateDLQTopicData();
+ createTopicsBackoff.resetAttempts();
+ if (this.isBatchable()) {
+
addRequestToNodeMap(this.dlqPartitionLeaderNode, this);
+ } else {
+ enqueue(this);
+ }
+ } catch (ConfigException e) {
+ LOG.error("Error enqueueing after DLQ create
topic response {}.", this, e);
+ if (!createTopicsBackoff.canAttempt()) {
+ LOG.error("Exhausted max retries while
populating DLQ topic for {} using DLQ topic {} without success.", name(),
dlqTopicName);
+ requestErrorResponse(new
Exception("Exhausted max retries while populating DLQ topic without success."));
+ break;
+ }
+ timer.add(new
ShareGroupDLQTimerTask(createTopicsBackoff.backOff(), this));
+ }
break;
case TOPIC_ALREADY_EXISTS:
@@ -301,7 +503,7 @@ public class ShareGroupDLQStateManager {
// was in-flight. As such this request might get
TOPIC_ALREADY_EXISTS error, which is acceptable
// let it try again and sender logic will take
care of it.
case THROTTLING_QUOTA_EXCEEDED:
- LOG.debug("Received retriable error in create DLQ
topic response for {} using DLQ topic {}: {}", name(), dlqTopicName,
errorMessage);
+ LOG.debug("Received retriable error in create DLQ
topic response for {} using DLQ topic {}: {}.", name(), dlqTopicName,
errorMessage);
if (!createTopicsBackoff.canAttempt()) {
LOG.error("Exhausted max retries to create DLQ
topic for {} using DLQ topic {} without success.", name(), dlqTopicName);
requestErrorResponse(new Exception("Exhausted
max retries to create DLQ topic without success."));
@@ -328,7 +530,94 @@ public class ShareGroupDLQStateManager {
break;
default:
- LOG.error("Unable to create DLQ topic due to error in
client response for {} using DLQ topic {}: {}", name(), dlqTopicName,
clientResponseError.code());
+ LOG.error("Unable to create DLQ topic due to error in
client response for {} using DLQ topic {}: {}.", name(), dlqTopicName,
clientResponseError.code());
+ requestErrorResponse(clientResponseError.exception());
+ }
+ }
+
+ private void handleProduceResponse(ClientResponse response) {
+ LOG.debug("Received ProduceRequestResponse {}.", response);
+ produceRequestBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
+
+ switch (clientResponseError) {
+ case NONE:
+ // Produce response received
+ ProduceResponse produceResponse = ((ProduceResponse)
response.responseBody());
+ ProduceResponseData.TopicProduceResponseCollection
produceResponseCollection = produceResponse.data().responses();
+ if (produceResponseCollection.isEmpty()) {
+ LOG.error("Received empty produce response for {} to
dlq topic node {}.", this, dlqPartitionLeaderNode());
+
requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception());
+ break;
+ }
+
+ ProduceResponseData.TopicProduceResponse
topicProduceResponse = produceResponseCollection.find(
+ new ProduceResponseData.TopicProduceResponse()
+ .setTopicId(dlqTopicPartitionData.topicId().get())
+ );
+ if (topicProduceResponse == null ||
+ topicProduceResponse.partitionResponses().isEmpty()
+ ) {
+ LOG.error("Received empty topic produce response {} to
dlq topic node {}.", this, dlqPartitionLeaderNode());
+
requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception());
+ break;
+ }
+
+ List<ProduceResponseData.PartitionProduceResponse>
partitionResponses = topicProduceResponse.partitionResponses();
+ ProduceResponseData.PartitionProduceResponse
partitionResponse = partitionResponses.stream().filter(res -> res.index() ==
dlqDestinationPartition)
+ .findFirst()
+ .orElse(null);
+
+ if (partitionResponse == null) {
+ LOG.error("Received empty partition produce response
{} to dlq topic node {}.", this, dlqPartitionLeaderNode());
+
requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception());
+ break;
+ }
+
+ Errors error =
Errors.forCode(partitionResponse.errorCode());
+ String errorMessage = partitionResponse.errorMessage();
+ switch (error) {
+ case NONE:
+ LOG.debug("Successfully produced records {} to dlq
topic node {}.", this, dlqPartitionLeaderNode());
+ produceRequestBackoff.resetAttempts();
+ this.result.complete(null);
+ break;
+
+ case NOT_LEADER_OR_FOLLOWER:
+ LOG.debug("Received retriable error produce
response for {} to dlq topic node {} - {}.", this, dlqPartitionLeaderNode(),
errorMessage);
+ if (!produceRequestBackoff.canAttempt()) {
+ LOG.error("Exhausted max retries to produce {}
to DLQ topic node {}.", this, dlqPartitionLeaderNode());
+ requestErrorResponse(new Exception("Exhausted
max retries to produce to DLQ topic without success."));
+ break;
+ }
+ timer.add(new
ShareGroupDLQTimerTask(produceRequestBackoff.backOff(), this));
+ break;
+
+ default:
+ LOG.error("Unable to produce {} to DLQ topic node
{} - {}.", this, dlqPartitionLeaderNode(), errorMessage);
+
partitionResponse.recordErrors().forEach(recordError ->
+ LOG.error("Records with errors {} - {}.",
recordError.batchIndex(), recordError.batchIndexErrorMessage()));
+ requestErrorResponse(error.exception());
+ }
+ break;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ LOG.debug("Received retriable error produce client
response for {} for DLQ node {} due to {}.",
+ param, dlqPartitionLeaderNode(),
clientResponseErrorMessage);
+ if (!produceRequestBackoff.canAttempt()) {
+ LOG.error("Exhausted max retries to produce {} to DLQ
topic node {} due to client response error {}.",
+ param, dlqPartitionLeaderNode(),
clientResponseErrorMessage);
+ requestErrorResponse(clientResponseError.exception());
+ break;
+ }
+ timer.add(new
ShareGroupDLQTimerTask(produceRequestBackoff.backOff(), this));
+ break;
+
+ default:
+ LOG.error("Unable to produce {} to DLQ topic node {} due
to client response error {}.",
+ param, dlqPartitionLeaderNode(),
clientResponseErrorMessage);
requestErrorResponse(clientResponseError.exception());
}
}
@@ -345,6 +634,8 @@ public class ShareGroupDLQStateManager {
@Override
public Collection<RequestAndCompletionHandler> generateRequests() {
+ List<RequestAndCompletionHandler> requests = new ArrayList<>();
+
if (!queue.isEmpty()) {
ShareGroupDLQStateManager.ProduceRequestHandler handler =
queue.poll();
// At this point either a correctly named and configured DLQ
topic exists or
@@ -354,7 +645,7 @@ public class ShareGroupDLQStateManager {
// We need to send RPC to create the topic
Node randomNode = randomNode();
if (randomNode == Node.noNode()) {
- log.error("Unable to find node to send create topic
request.");
+ log.error("Unable to find node to send create topic
request for handler {}.", handler);
// fatal failure, cannot retry or progress
// fail the RPC
handler.requestErrorResponse(Errors.BROKER_NOT_AVAILABLE.exception());
@@ -370,12 +661,81 @@ public class ShareGroupDLQStateManager {
handler
));
} catch (ConfigException exp) {
- log.error("Unable to create topic request.", exp);
+ log.error("Unable to create topic request for handler
{}.", handler, exp);
handler.requestErrorResponse(Errors.INVALID_CONFIG.exception());
}
+ } else {
+ if (!handler.isBatchable()) {
+ requests.add(new RequestAndCompletionHandler(
+ time.milliseconds(),
+ handler.dlqPartitionLeaderNode(),
+ handler.requestBuilder(),
+ handler
+ ));
+ }
}
}
- return List.of();
+
+ // {
+ // node1: {
+ // [P1, P2, P3]
+ // },
+ // node2: {
+ //. [P4, P5]
+ // }, ...
+ // }
+ // For a sequence of produce RPCs, the flow would be:
+ // 1. 1st produce request arrives.
+ // 2. it is enqueued in the send thread.
+ // 3. wakeup event causes the generate requests to create the DLQ
topic if required.
+ // 4. it will cause either RPC or cache lookup.
+ // 5. once complete, the produce handler is added to the nodeMap
for batching and not the queue.
+ // 6. wakeup event causes generateRequests to iterate over the map
and send the produce request (P1) and
+ // remove node from the nodeMap and add it to inFlight.
+ // 7. until P1 completes, more produce requests (P2, P3, ...)
could come in and get added to the nodeMap as per point 3, 4, 5.
+ // 8. if these belong to same node as P1. They will not be sent as
the membership test with inFlight will pass.
+ // 9. when P1 completes, it will clear inFlight and raise wakeup
event.
+ // 10. at this point P2, P3, etc. could be sent as a combined
request thus achieving batching.
+ final Set<Node> sending = new HashSet<>();
+ final Set<Node> emptyNodes = new HashSet<>(); // Nodes for which
no coalesced handler was found.
+ synchronized (nodeMapLock) {
+ nodeRPCMap.forEach((destNode, handlers) -> {
+ // this condition causes requests of same type and same
destination node
+ // to not be sent immediately but get batched
+ if (!inFlight.contains(destNode)) {
+ CoalesceResults results =
coalesceProduceRequests(handlers);
+ if (results.liveHandlers.isEmpty()) {
+ emptyNodes.add(destNode);
+ return;
+ }
+ requests.add(new RequestAndCompletionHandler(
+ time.milliseconds(),
+ destNode,
+ results.request,
+ response -> {
+ inFlight.remove(destNode);
+
+ // now the combined request has completed
+ // we need to create responses for individual
+ // requests which composed the combined request
+ results.liveHandlers.forEach(handler ->
handler.onComplete(response));
+ wakeup();
+ }));
+ sending.add(destNode);
+ }
+ });
+
+ emptyNodes.forEach(nodeRPCMap::remove);
+ sending.forEach(node -> {
+ // we need to add these nodes to inFlight
+ inFlight.add(node);
+
+ // remove from nodeMap
+ nodeRPCMap.remove(node);
+ });
+ } // close of synchronized context
+
+ return requests;
}
public void enqueue(ShareGroupDLQStateManager.ProduceRequestHandler
handler) {
@@ -411,4 +771,39 @@ public class ShareGroupDLQStateManager {
sender.wakeup();
}
}
+
+ private record CoalesceResults(
+ AbstractRequest.Builder<? extends AbstractRequest> request,
+ List<ProduceRequestHandler> liveHandlers
+ ) {
+ }
+
+ private static CoalesceResults
coalesceProduceRequests(List<ProduceRequestHandler> handlers) {
+ Map<Uuid, ProduceRequestData.TopicProduceData> produceHandlerMap = new
HashMap<>();
+ List<ProduceRequestHandler> liveHandlers = new
ArrayList<>(handlers.size());
+ handlers.forEach(handler -> {
+ try {
+ ProduceRequestData.TopicProduceData topicProduceData =
handler.topicProduceData();
+ produceHandlerMap.computeIfAbsent(topicProduceData.topicId(),
topicId ->
+ new ProduceRequestData.TopicProduceData()
+ .setName(topicProduceData.name())
+ .setTopicId(topicId)
+ ).partitionData().addAll(topicProduceData.partitionData());
+ liveHandlers.add(handler);
+ } catch (Exception exception) {
+ log.error("Unable to coalesce ProduceRequestData for handler
{}. It will be skipped from DLQ.", handler, exception);
+ handler.requestErrorResponse(exception);
+ }
+ });
+
+ ProduceRequestData data = new ProduceRequestData()
+ .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(produceHandlerMap.values().iterator()))
+ .setAcks((short) -1) // all replicas
+ .setTimeoutMs(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT);
+
+ return new CoalesceResults(
+ new ProduceRequest.Builder(ApiKeys.PRODUCE.latestVersion(),
ApiKeys.PRODUCE.latestVersion(), data),
+ liveHandlers
+ );
+ }
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
new file mode 100644
index 00000000000..a6d939003ce
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
@@ -0,0 +1,972 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.message.ProduceResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import
org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper.TopicPartitionData;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.SystemTimerReaper;
+import org.apache.kafka.server.util.timer.Timer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_DELIVERY_COUNT;
+import static
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_GROUP;
+import static
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_MESSAGE;
+import static
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_OFFSET;
+import static
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_PARTITION;
+import static
org.apache.kafka.server.share.dlq.ShareGroupDLQStateManager.ProduceRequestHandler.HEADER_DLQ_ERRORS_TOPIC;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class ShareGroupDLQStateManagerTest {
+ private static final MockTime MOCK_TIME = new MockTime();
+ private static final String HOST = "localhost";
+ private static final int PORT = 9092;
+ private static final String GROUP_ID = "test-group";
+ private static final String DLQ_TOPIC = "dlq-topic";
+ private static final Uuid DLQ_TOPIC_ID = Uuid.randomUuid();
+ private static final Uuid SOURCE_TOPIC_ID = Uuid.randomUuid();
+ private static final Node DEFAULT_LEADER = new Node(0, HOST, PORT);
+
+ private final MockTimer mockTimer = new MockTimer(MOCK_TIME);
+ private ShareGroupDLQStateManager stateManager;
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (stateManager != null) {
+ stateManager.stop();
+ }
+ }
+
+ private final class Builder {
+ private KafkaClient client;
+ private Time time = MOCK_TIME;
+ private Timer timer;
+ private ShareGroupDLQMetadataCacheHelper cacheHelper;
+
+ Builder withClient(KafkaClient client) {
+ this.client = client;
+ return this;
+ }
+
+ Builder withCacheHelper(ShareGroupDLQMetadataCacheHelper cacheHelper) {
+ this.cacheHelper = cacheHelper;
+ return this;
+ }
+
+ Builder withTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
+ Builder withTimer(Timer timer) {
+ this.timer = timer;
+ return this;
+ }
+
+ ShareGroupDLQStateManager build() {
+ return new ShareGroupDLQStateManager(
+ client != null ? client : new MockClient(MOCK_TIME),
+ cacheHelper != null ? cacheHelper :
happyCacheHelper(DEFAULT_LEADER),
+ time,
+ timer != null ? timer : mockTimer
+ );
+ }
+ }
+
+ private Builder builder() {
+ return new Builder();
+ }
+
+ private static ShareGroupDLQRecordParameter param() {
+ return new ShareGroupDLQRecordParameter(
+ GROUP_ID,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+ 0L,
+ 2L,
+ Optional.of((short) 1),
+ Optional.of(new RuntimeException("simulated cause")),
+ false
+ );
+ }
+
+ private static ShareGroupDLQMetadataCacheHelper happyCacheHelper(Node
leader) {
+ ShareGroupDLQMetadataCacheHelper helper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(helper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+ when(helper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(helper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+ when(helper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+ when(helper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+ when(helper.getClusterNodes()).thenReturn(List.of(leader));
+
when(helper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+ when(helper.topicPartitionData(DLQ_TOPIC)).thenReturn(new
TopicPartitionData(
+ DLQ_TOPIC,
+ Optional.of(1),
+ Optional.of(DLQ_TOPIC_ID),
+ List.of(leader)
+ ));
+ return helper;
+ }
+
+ private static Throwable getCause(CompletableFuture<Void> future) {
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected the future to complete exceptionally");
+ return null;
+ } catch (ExecutionException ee) {
+ return ee.getCause();
+ } catch (InterruptedException | TimeoutException e) {
+ fail("Future did not complete", e);
+ return null;
+ }
+ }
+
+ /**
+ * Expected headers and offset range for one DLQ partition's records
inside a captured produce
+ * request. {@code sharedHeaders} are expected to be identical on every
record in that partition;
+ * the offset header is built per-record from {@code firstOffset}..{@code
lastOffset}.
+ */
+ private record ExpectedDlqPartition(long firstOffset, long lastOffset,
Map<String, String> sharedHeaders) {
+ }
+
+ /**
+ * Verifies record-level headers for every partition of the (single) topic
in a captured produce
+ * request. Keys of {@code expectedByPartitionIndex} are DLQ partition
indices (as reported by
+ * {@link ProduceRequestData.PartitionProduceData#index()}); the set of
partitions present in the
+ * request must match the keys exactly.
+ */
+ private static void assertDlqProduceRecordHeaders(
+ ProduceRequest request,
+ Map<Integer, ExpectedDlqPartition> expectedByPartitionIndex
+ ) {
+ ProduceRequestData.TopicProduceData topic =
request.data().topicData().iterator().next();
+ Set<Integer> actualPartitionIndices = topic.partitionData().stream()
+ .map(ProduceRequestData.PartitionProduceData::index)
+ .collect(Collectors.toSet());
+ assertEquals(expectedByPartitionIndex.keySet(), actualPartitionIndices,
+ "Unexpected set of DLQ partitions in produce request");
+
+ for (ProduceRequestData.PartitionProduceData partition :
topic.partitionData()) {
+ ExpectedDlqPartition expected =
expectedByPartitionIndex.get(partition.index());
+ MemoryRecords records = (MemoryRecords) partition.records();
+
+ long expectedOffset = expected.firstOffset();
+ int recordCount = 0;
+ for (Record record : records.records()) {
+ Map<String, String> actualHeaders = new HashMap<>();
+ for (Header h : record.headers()) {
+ actualHeaders.put(h.key(), new String(h.value(),
StandardCharsets.UTF_8));
+ }
+ Map<String, String> expectedHeaders = new
HashMap<>(expected.sharedHeaders());
+ expectedHeaders.put(HEADER_DLQ_ERRORS_OFFSET,
Long.toString(expectedOffset));
+ assertEquals(expectedHeaders, actualHeaders,
+ "Partition " + partition.index() + " record at offset " +
expectedOffset + " has unexpected headers");
+ expectedOffset++;
+ recordCount++;
+ }
+ assertEquals((int) (expected.lastOffset() - expected.firstOffset()
+ 1), recordCount,
+ "Partition " + partition.index() + " has unexpected number of
records");
+ }
+ }
+
+ // ---- Constructor null-check tests ----
+
+ @Test
+ public void testConstructorRejectsNullClient() {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+ assertThrows(IllegalArgumentException.class,
+ () -> new ShareGroupDLQStateManager(null, cacheHelper, MOCK_TIME,
mockTimer));
+ }
+
+ @Test
+ public void testConstructorRejectsNullCacheHelper() {
+ KafkaClient client = mock(KafkaClient.class);
+ assertThrows(IllegalArgumentException.class,
+ () -> new ShareGroupDLQStateManager(client, null, MOCK_TIME,
mockTimer));
+ }
+
+ @Test
+ public void testConstructorRejectsNullTime() {
+ KafkaClient client = mock(KafkaClient.class);
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+ assertThrows(IllegalArgumentException.class,
+ () -> new ShareGroupDLQStateManager(client, cacheHelper, null,
mockTimer));
+ }
+
+ @Test
+ public void testConstructorRejectsNullTimer() {
+ KafkaClient client = mock(KafkaClient.class);
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+ assertThrows(IllegalArgumentException.class,
+ () -> new ShareGroupDLQStateManager(client, cacheHelper,
MOCK_TIME, null));
+ }
+
+ // ---- Lifecycle tests ----
+
+ @Test
+ public void testStartIsIdempotent() {
+ stateManager = builder().build();
+
+ stateManager.start();
+ stateManager.start();
+ // tearDown will call stateManager.stop() and must not throw.
+ }
+
+ @Test
+ public void testStopWithoutStartIsNoOp() {
+ stateManager = builder().build();
+ // tearDown will call stateManager.stop() without a prior start() and
must not throw.
+ }
+
+ // ---- DLQ topic validation tests (no thread start required) ----
+
+ @Test
+ public void testDlqEmptyTopicNameFailsValidation() throws Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.empty());
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertInstanceOf(ConfigException.class, cause);
+ assertTrue(cause.getMessage().contains("empty"));
+ }
+
+ @Test
+ public void testDlqTopicStartingWithUnderscoreFailsValidation() throws
Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of("__internal_dlq"));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertInstanceOf(ConfigException.class, cause);
+ assertTrue(cause.getMessage().contains("__"));
+ }
+
+ @Test
+ public void testDlqExistingTopicWithoutDlqConfigFailsValidation() throws
Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(false);
+
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertInstanceOf(ConfigException.class, cause);
+ assertTrue(cause.getMessage().contains("DLQ is not enabled"));
+ }
+
+ @Test
+ public void testDlqTopicMissingAndAutoCreateDisabledFailsValidation()
throws Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(false);
+
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertInstanceOf(ConfigException.class, cause);
+ assertTrue(cause.getMessage().contains("auto create is disabled"));
+ }
+
+ @Test
+ public void testDlqTopicPrefixMismatchFailsValidation() throws Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.of("required-prefix-"));
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertInstanceOf(ConfigException.class, cause);
+ assertTrue(cause.getMessage().contains("does not comply with the DLQ
topic prefix"));
+ }
+
+ @Test
+ public void testDlqValidationFailureCompletesFutureBeforeStart() throws
Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.empty());
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+
+ // validateDlqTopic runs synchronously inside dlq(), so it should fail
without the sender thread.
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ CompletableFuture<Void> result = stateManager.dlq(param());
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFalse(result.isCancelled());
+ }
+
+ // ---- Full integration tests ----
+
+ @Test
+ public void testDlqHappyPathExistingTopic() throws Exception {
+ MockClient client = new MockClient(MOCK_TIME);
+ List<ProduceRequest> capturedProduces = new ArrayList<>();
+ client.prepareResponseFrom(
+ body -> {
+ if (body instanceof ProduceRequest pr) {
+ capturedProduces.add(pr);
+ return true;
+ }
+ return false;
+ },
+ successfulProduceResponse(0),
+ DEFAULT_LEADER
+ );
+
+ stateManager = builder().withClient(client).build();
+ stateManager.start();
+ assertNull(stateManager.dlq(param()).get(10, TimeUnit.SECONDS));
+
+ assertEquals(1, capturedProduces.size());
+ assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+ 0, new ExpectedDlqPartition(0L, 2L, Map.of(
+ HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+ HEADER_DLQ_ERRORS_PARTITION, "0",
+ HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+ HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+ HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+ ))
+ ));
+ }
+
+ @Test
+ public void testDlqTopicPrefixEmptyStringSkipsPrefixCheck() throws
Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.of(""));
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+ when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new
TopicPartitionData(
+ DLQ_TOPIC,
+ Optional.of(1),
+ Optional.of(DLQ_TOPIC_ID),
+ List.of(DEFAULT_LEADER)
+ ));
+
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+
+ MockClient client = new MockClient(MOCK_TIME);
+ List<ProduceRequest> capturedProduces = new ArrayList<>();
+ client.prepareResponseFrom(
+ body -> {
+ if (body instanceof ProduceRequest pr) {
+ capturedProduces.add(pr);
+ return true;
+ }
+ return false;
+ },
+ successfulProduceResponse(0),
+ DEFAULT_LEADER
+ );
+
+ stateManager = builder()
+ .withClient(client)
+ .withCacheHelper(cacheHelper)
+ .build();
+ stateManager.start();
+ assertNull(stateManager.dlq(param()).get(10, TimeUnit.SECONDS));
+
+ assertEquals(1, capturedProduces.size());
+ assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+ 0, new ExpectedDlqPartition(0L, 2L, Map.of(
+ HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+ HEADER_DLQ_ERRORS_PARTITION, "0",
+ HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+ HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+ HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+ ))
+ ));
+ }
+
+ @Test
+ public void testDlqCreateTopicThenProduceSucceeds() throws Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+ when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new
TopicPartitionData(
+ DLQ_TOPIC,
+ Optional.of(1),
+ Optional.of(DLQ_TOPIC_ID),
+ List.of(DEFAULT_LEADER)
+ ));
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+
+ MockClient client = new MockClient(MOCK_TIME);
+ List<ProduceRequest> capturedProduces = new ArrayList<>();
+ client.prepareResponseFrom(
+ body -> body instanceof CreateTopicsRequest,
+ successfulCreateTopicsResponse(),
+ DEFAULT_LEADER
+ );
+ client.prepareResponseFrom(
+ body -> {
+ if (body instanceof ProduceRequest pr) {
+ capturedProduces.add(pr);
+ return true;
+ }
+ return false;
+ },
+ successfulProduceResponse(0),
+ DEFAULT_LEADER
+ );
+
+ stateManager = builder()
+ .withClient(client)
+ .withCacheHelper(cacheHelper)
+ .build();
+ stateManager.start();
+ assertNull(stateManager.dlq(param()).get(10, TimeUnit.SECONDS));
+
+ assertEquals(1, capturedProduces.size());
+ assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+ 0, new ExpectedDlqPartition(0L, 2L, Map.of(
+ HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+ HEADER_DLQ_ERRORS_PARTITION, "0",
+ HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+ HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+ HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+ ))
+ ));
+ }
+
+ @Test
+ public void testDlqCreateTopicFatalErrorFailsFuture() throws Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+
+ MockClient client = new MockClient(MOCK_TIME);
+ client.prepareResponseFrom(
+ body -> body instanceof CreateTopicsRequest,
+ createTopicsResponse(Errors.INVALID_REPLICATION_FACTOR),
+ DEFAULT_LEADER
+ );
+
+ stateManager = builder()
+ .withClient(client)
+ .withCacheHelper(cacheHelper)
+ .build();
+ stateManager.start();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertNotNull(cause);
+ assertEquals(Errors.INVALID_REPLICATION_FACTOR.exception().getClass(),
cause.getClass());
+ }
+
+ @Test
+ public void testDlqCreateTopicNoClusterNodesFailsFuture() throws Exception
{
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+ when(cacheHelper.getClusterNodes()).thenReturn(List.of());
+
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ stateManager.start();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertNotNull(cause);
+ assertEquals(Errors.BROKER_NOT_AVAILABLE.exception().getClass(),
cause.getClass());
+ }
+
+ @Test
+ public void testDlqCreateTopicRetriesExhaustedFailsWithNetworkException()
throws Exception {
+ int maxAttempts = 3;
+ // Force the create-topic path: configured DLQ topic does not yet
exist in metadata.
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+
+ // Real timer with tiny backoffs lets the exhaustion path actually
fire in a few ms.
+ Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+ new SystemTimer("shareGroupDLQTestTimer"));
+ try {
+ MockClient client = new MockClient(MOCK_TIME);
+ for (int i = 0; i < maxAttempts; i++) {
+ client.prepareResponseFrom(
+ body -> body instanceof CreateTopicsRequest,
+ null,
+ DEFAULT_LEADER,
+ true
+ );
+ }
+
+ stateManager = builder()
+ .withClient(client)
+ .withCacheHelper(cacheHelper)
+ .withTimer(realTimer)
+ .build();
+ stateManager.start();
+
+ Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L,
maxAttempts));
+ assertNotNull(cause);
+ assertEquals(Errors.NETWORK_EXCEPTION.exception().getClass(),
cause.getClass());
+ } finally {
+ Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+ }
+ }
+
+ @Test
+ public void testDlqCreateTopicPartialFailuresThenSucceeds() throws
Exception {
+ int maxAttempts = 3;
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(false);
+
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+ when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new
TopicPartitionData(
+ DLQ_TOPIC,
+ Optional.of(1),
+ Optional.of(DLQ_TOPIC_ID),
+ List.of(DEFAULT_LEADER)
+ ));
+
+ Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+ new SystemTimer("shareGroupDLQTestTimer"));
+ try {
+ MockClient client = new MockClient(MOCK_TIME);
+ List<ProduceRequest> capturedProduces = new ArrayList<>();
+ // Two CreateTopics disconnects (retriable), then a successful
create, then the produce succeeds.
+ client.prepareResponseFrom(body -> body instanceof
CreateTopicsRequest, null, DEFAULT_LEADER, true);
+ client.prepareResponseFrom(body -> body instanceof
CreateTopicsRequest, null, DEFAULT_LEADER, true);
+ client.prepareResponseFrom(
+ body -> body instanceof CreateTopicsRequest,
+ successfulCreateTopicsResponse(),
+ DEFAULT_LEADER
+ );
+ client.prepareResponseFrom(
+ body -> {
+ if (body instanceof ProduceRequest pr) {
+ capturedProduces.add(pr);
+ return true;
+ }
+ return false;
+ },
+ successfulProduceResponse(0),
+ DEFAULT_LEADER
+ );
+
+ stateManager = builder()
+ .withClient(client)
+ .withCacheHelper(cacheHelper)
+ .withTimer(realTimer)
+ .build();
+ stateManager.start();
+
+ assertNull(stateManager.dlq(param(), 1L, 5L, maxAttempts).get(5,
TimeUnit.SECONDS));
+
+ assertEquals(1, capturedProduces.size());
+ assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+ 0, new ExpectedDlqPartition(0L, 2L, Map.of(
+ HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+ HEADER_DLQ_ERRORS_PARTITION, "0",
+ HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+ HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+ HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+ ))
+ ));
+ } finally {
+ Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+ }
+ }
+
+ @Test
+ public void testDlqProducePartialFailuresThenSucceeds() throws Exception {
+ int maxAttempts = 3;
+ Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+ new SystemTimer("shareGroupDLQTestTimer"));
+ try {
+ MockClient client = new MockClient(MOCK_TIME);
+ List<ProduceRequest> capturedProduces = new ArrayList<>();
+ // Two Produce disconnects (retriable), then a successful produce.
Capture all attempts;
+ // the retried attempts must carry the same headers as the
successful one.
+ MockClient.RequestMatcher captureProduce = body -> {
+ if (body instanceof ProduceRequest pr) {
+ capturedProduces.add(pr);
+ return true;
+ }
+ return false;
+ };
+ client.prepareResponseFrom(captureProduce, null, DEFAULT_LEADER,
true);
+ client.prepareResponseFrom(captureProduce, null, DEFAULT_LEADER,
true);
+ client.prepareResponseFrom(captureProduce,
successfulProduceResponse(0), DEFAULT_LEADER);
+
+ stateManager = builder()
+ .withClient(client)
+ .withTimer(realTimer)
+ .build();
+ stateManager.start();
+
+ assertNull(stateManager.dlq(param(), 1L, 5L, maxAttempts).get(5,
TimeUnit.SECONDS));
+
+ assertEquals(maxAttempts, capturedProduces.size());
+ Map<Integer, ExpectedDlqPartition> expectedByPartition = Map.of(
+ 0, new ExpectedDlqPartition(0L, 2L, Map.of(
+ HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+ HEADER_DLQ_ERRORS_PARTITION, "0",
+ HEADER_DLQ_ERRORS_GROUP, GROUP_ID,
+ HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1",
+ HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
+ ))
+ );
+ for (ProduceRequest pr : capturedProduces) {
+ assertDlqProduceRecordHeaders(pr, expectedByPartition);
+ }
+ } finally {
+ Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+ }
+ }
+
+ @Test
+ public void testDlqProduceFatalErrorFailsFuture() throws Exception {
+ MockClient client = new MockClient(MOCK_TIME);
+ client.prepareResponseFrom(
+ body -> body instanceof ProduceRequest,
+ produceResponseWithError(Errors.INVALID_TOPIC_EXCEPTION),
+ DEFAULT_LEADER
+ );
+
+ stateManager = builder().withClient(client).build();
+ stateManager.start();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertNotNull(cause);
+ assertEquals(Errors.INVALID_TOPIC_EXCEPTION.exception().getClass(),
cause.getClass());
+ }
+
+ @Test
+ public void testDlqProduceEmptyResponseFailsFuture() throws Exception {
+ MockClient client = new MockClient(MOCK_TIME);
+ client.prepareResponseFrom(
+ body -> body instanceof ProduceRequest,
+ new ProduceResponse(new ProduceResponseData()),
+ DEFAULT_LEADER
+ );
+
+ stateManager = builder().withClient(client).build();
+ stateManager.start();
+ Throwable cause = getCause(stateManager.dlq(param()));
+ assertNotNull(cause);
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.exception().getClass(),
cause.getClass());
+ }
+
+ @Test
+ public void testDlqProduceDisconnectIsRetriedNotImmediatelyFailed() throws
Exception {
+ MockClient client = new MockClient(MOCK_TIME);
+ // Null response body + disconnected=true triggers the
wasDisconnected() branch in
+ // ShareGroupDLQStateManager#checkResponseError. Since the disconnect
is retriable, the
+ // future must NOT complete on the first attempt - we just verify the
retry was scheduled
+ // rather than waiting for full retry exhaustion (which can take ~30s
due to the
+ // hard-coded exponential backoff in ShareGroupDLQStateManager).
+ client.prepareResponseFrom(
+ body -> body instanceof ProduceRequest,
+ null,
+ DEFAULT_LEADER,
+ true
+ );
+
+ stateManager = builder().withClient(client).build();
+ stateManager.start();
+ CompletableFuture<Void> result = stateManager.dlq(param());
+ // Brief wait so the disconnect response can be processed; the future
should remain
+ // pending because the retry has been scheduled rather than completing
exceptionally.
+ try {
+ result.get(500, TimeUnit.MILLISECONDS);
+ fail("Expected the future to remain incomplete while retry is
pending");
+ } catch (TimeoutException expected) {
+ assertFalse(result.isDone());
+ }
+ }
+
+ @Test
+ public void testDlqProduceRetriesExhaustedFailsWithNetworkException()
throws Exception {
+ int maxAttempts = 3;
+ // Real timer with tiny backoffs lets the exhaustion path actually
fire in a few ms,
+ // rather than the ~30s a MockTimer-less production-like setup would
take.
+ Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+ new SystemTimer("shareGroupDLQTestTimer"));
+ try {
+ MockClient client = new MockClient(MOCK_TIME);
+ // Each retry consumes one prepared response; stage one disconnect
per attempt.
+ for (int i = 0; i < maxAttempts; i++) {
+ client.prepareResponseFrom(
+ body -> body instanceof ProduceRequest,
+ null,
+ DEFAULT_LEADER,
+ true
+ );
+ }
+
+ stateManager = builder()
+ .withClient(client)
+ .withTimer(realTimer)
+ .build();
+ stateManager.start();
+
+ Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L,
maxAttempts));
+ assertNotNull(cause);
+ assertEquals(Errors.NETWORK_EXCEPTION.exception().getClass(),
cause.getClass());
+ } finally {
+ Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+ }
+ }
+
+ @Test
+ public void testDlqTwoEnqueuedRecordsBothComplete() throws Exception {
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(GROUP_ID)).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+ when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new
TopicPartitionData(
+ DLQ_TOPIC,
+ Optional.of(2),
+ Optional.of(DLQ_TOPIC_ID),
+ List.of(DEFAULT_LEADER, DEFAULT_LEADER)
+ ));
+
+ // Whether the two handlers end up coalesced into a single produce
request or are sent as
+ // two separate requests depends on internal scheduling. Provide a
multi-partition response
+ // that satisfies either case: each request will see partition indices
0 and 1 in the
+ // response, and the handler picks out the index that matches its
destination partition.
+ ProduceResponseData.TopicProduceResponse topicResp = new
ProduceResponseData.TopicProduceResponse()
+ .setTopicId(DLQ_TOPIC_ID)
+ .setPartitionResponses(List.of(
+ new ProduceResponseData.PartitionProduceResponse()
+ .setIndex(0)
+ .setErrorCode(Errors.NONE.code()),
+ new ProduceResponseData.PartitionProduceResponse()
+ .setIndex(1)
+ .setErrorCode(Errors.NONE.code())
+ ));
+ ProduceResponseData.TopicProduceResponseCollection collection =
+ new ProduceResponseData.TopicProduceResponseCollection();
+ collection.add(topicResp);
+
+ MockClient client = new MockClient(MOCK_TIME);
+ List<ProduceRequest> capturedProduces = new ArrayList<>();
+ MockClient.RequestMatcher captureProduce = body -> {
+ if (body instanceof ProduceRequest pr) {
+ capturedProduces.add(pr);
+ return true;
+ }
+ return false;
+ };
+ // Two identical responses cover the non-coalesced path.
+ client.prepareResponseFrom(captureProduce,
+ new ProduceResponse(new
ProduceResponseData().setResponses(collection.duplicate())),
+ DEFAULT_LEADER);
+ client.prepareResponseFrom(captureProduce,
+ new ProduceResponse(new
ProduceResponseData().setResponses(collection.duplicate())),
+ DEFAULT_LEADER);
+
+ stateManager = builder()
+ .withClient(client)
+ .withCacheHelper(cacheHelper)
+ .build();
+ stateManager.start();
+ ShareGroupDLQRecordParameter p0 = new ShareGroupDLQRecordParameter(
+ GROUP_ID,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false);
+ ShareGroupDLQRecordParameter p1 = new ShareGroupDLQRecordParameter(
+ GROUP_ID,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 1, "source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false);
+
+ CompletableFuture<Void> r0 = stateManager.dlq(p0);
+ CompletableFuture<Void> r1 = stateManager.dlq(p1);
+
+ assertNull(r0.get(10, TimeUnit.SECONDS));
+ assertNull(r1.get(10, TimeUnit.SECONDS));
+
+ // Two source partitions map to two distinct DLQ partition indices (0
and 1, given 2
+ // DLQ partitions). Whether they end up coalesced into a single
request (one topic, two
+ // partitions) or sent as two requests (each with one partition)
depends on scheduling.
+ ExpectedDlqPartition expectedDlqPartition0 = new
ExpectedDlqPartition(0L, 0L, Map.of(
+ HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+ HEADER_DLQ_ERRORS_PARTITION, "0",
+ HEADER_DLQ_ERRORS_GROUP, GROUP_ID
+ ));
+ ExpectedDlqPartition expectedDlqPartition1 = new
ExpectedDlqPartition(0L, 0L, Map.of(
+ HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+ HEADER_DLQ_ERRORS_PARTITION, "1",
+ HEADER_DLQ_ERRORS_GROUP, GROUP_ID
+ ));
+ if (capturedProduces.size() == 1) {
+ assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+ 0, expectedDlqPartition0,
+ 1, expectedDlqPartition1
+ ));
+ } else {
+ assertEquals(2, capturedProduces.size(),
+ "Expected coalesced (1 request) or non-coalesced (2 requests),
got " + capturedProduces.size());
+ for (ProduceRequest pr : capturedProduces) {
+ int dlqPartitionIndex =
pr.data().topicData().iterator().next().partitionData().get(0).index();
+ ExpectedDlqPartition expected = dlqPartitionIndex == 0 ?
expectedDlqPartition0 : expectedDlqPartition1;
+ assertDlqProduceRecordHeaders(pr, Map.of(dlqPartitionIndex,
expected));
+ }
+ }
+ }
+
+ @Test
+ public void testDlqResolvesSourceTopicNameViaCacheHelperWhenMissing()
throws Exception {
+ MockClient client = new MockClient(MOCK_TIME);
+ List<ProduceRequest> capturedProduces = new ArrayList<>();
+ client.prepareResponseFrom(
+ body -> {
+ if (body instanceof ProduceRequest pr) {
+ capturedProduces.add(pr);
+ return true;
+ }
+ return false;
+ },
+ successfulProduceResponse(0),
+ DEFAULT_LEADER
+ );
+
+ stateManager = builder().withClient(client).build();
+ stateManager.start();
+ ShareGroupDLQRecordParameter p = new ShareGroupDLQRecordParameter(
+ GROUP_ID,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 0, null),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false);
+ assertNull(stateManager.dlq(p).get(10, TimeUnit.SECONDS));
+
+ assertEquals(1, capturedProduces.size());
+ // Source topic name was null in the parameter; the manager must have
resolved it via
+ // ShareGroupDLQMetadataCacheHelper.topicName(SOURCE_TOPIC_ID), which
the happy helper
+ // returns as "source-topic".
+ assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of(
+ 0, new ExpectedDlqPartition(0L, 0L, Map.of(
+ HEADER_DLQ_ERRORS_TOPIC, "source-topic",
+ HEADER_DLQ_ERRORS_PARTITION, "0",
+ HEADER_DLQ_ERRORS_GROUP, GROUP_ID
+ ))
+ ));
+ }
+
+ // ---- Response builder helpers ----
+
+ private static ProduceResponse successfulProduceResponse(int partition) {
+ return produceResponseFor(partition, Errors.NONE);
+ }
+
+ private static ProduceResponse produceResponseWithError(Errors error) {
+ return produceResponseFor(0, error);
+ }
+
+ private static ProduceResponse produceResponseFor(int partition, Errors
error) {
+ // Don't set name: the manager looks up the TopicProduceResponse using
only topicId, which
+ // implies the lookup-key name is the default empty string.
+ ProduceResponseData.TopicProduceResponse topicResp = new
ProduceResponseData.TopicProduceResponse()
+ .setTopicId(DLQ_TOPIC_ID)
+ .setPartitionResponses(List.of(
+ new ProduceResponseData.PartitionProduceResponse()
+ .setIndex(partition)
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message())
+ ));
+ ProduceResponseData.TopicProduceResponseCollection collection =
+ new ProduceResponseData.TopicProduceResponseCollection();
+ collection.add(topicResp);
+ return new ProduceResponse(new
ProduceResponseData().setResponses(collection));
+ }
+
+ private static CreateTopicsResponse successfulCreateTopicsResponse() {
+ return createTopicsResponse(Errors.NONE);
+ }
+
+ private static CreateTopicsResponse createTopicsResponse(Errors error) {
+ CreateTopicsResponseData data = new CreateTopicsResponseData();
+ data.topics().add(new CreateTopicsResponseData.CreatableTopicResult()
+ .setName(DLQ_TOPIC)
+ .setTopicId(DLQ_TOPIC_ID)
+ .setNumPartitions(1)
+ .setReplicationFactor((short) 1)
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message()));
+ return new CreateTopicsResponse(data);
+ }
+}