This is an automated email from the ASF dual-hosted git repository.
apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 69d4bfe9b93 KAFKA-20612: Stitch DLQ metrics with
ShareGroupDLQStateManager. (#22412)
69d4bfe9b93 is described below
commit 69d4bfe9b93a8041f64375d0f4a47fd823cdc8ce
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Jun 5 00:40:39 2026 +0530
KAFKA-20612: Stitch DLQ metrics with ShareGroupDLQStateManager. (#22412)
* Stitched `ShareGroupMetrics` with `ShareGroupDLQStateManager `. This
includes 3 metrics defined in KIP-1191:
* total records written - incremented on success response from produce
RPC
* produce requests total per group per sec - incremented for each new
handler enqueued for produce
* produce requests failed per group per sec - incremented for every
handler with failed produce response
* New tests have been added to `ShareGroupDLQStateManagerTest` to test
batching and metrics.
* Currently, ownership of `ShareGroupMetrics` was with
`SharePartitionManager` which has been modified. Now the owner is
`BrokerServer` which passes the object to SPM and SGDSM. Closing the
metrics is also done by the `BrokerServer`.
Reviewers: Apoorv Mittal <[email protected]>
---
.../kafka/server/share/SharePartitionManager.java | 4 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 13 +-
.../server/share/SharePartitionManagerTest.java | 1 -
.../share/dlq/DefaultShareGroupDLQManager.java | 21 +-
.../share/dlq/ShareGroupDLQStateManager.java | 45 +-
.../share/dlq/ShareGroupDLQStateManagerTest.java | 492 ++++++++++++++++++++-
6 files changed, 560 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index a8eff11ac90..dcd2ccd65e9 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -191,6 +191,7 @@ public class SharePartitionManager implements AutoCloseable
{
long remoteFetchMaxWaitMs,
Persister persister,
ShareGroupConfigProvider configProvider,
+ ShareGroupMetrics shareGroupMetrics,
BrokerTopicStats brokerTopicStats,
Supplier<Boolean> shareGroupDlqEnableSupplier,
ShareGroupDLQManager shareGroupDLQManager
@@ -208,7 +209,7 @@ public class SharePartitionManager implements AutoCloseable
{
remoteFetchMaxWaitMs,
persister,
configProvider,
- new ShareGroupMetrics(time),
+ shareGroupMetrics,
brokerTopicStats,
shareGroupDlqEnableSupplier,
shareGroupDLQManager
@@ -670,7 +671,6 @@ public class SharePartitionManager implements AutoCloseable
{
@Override
public void close() throws Exception {
this.timer.close();
- this.shareGroupMetrics.close();
}
private ShareSessionKey shareSessionKey(String groupId, String memberId) {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 63a0472ddf0..c339a1d595a 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -64,6 +64,7 @@ import
org.apache.kafka.storage.internals.log.{LogDirFailureChannel, LogManager
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.apache.kafka.server.partition.{AlterPartitionManager,
DefaultAlterPartitionManager}
import org.apache.kafka.server.share.dlq.{DefaultShareGroupDLQManager,
NoOpShareGroupDLQManager, ShareGroupDLQManager}
+import org.apache.kafka.server.share.metrics.ShareGroupMetrics
import java.time.Duration
import java.util
@@ -166,6 +167,8 @@ class BrokerServer(
var clientMetricsManager: ClientMetricsManager = _
+ var shareGroupMetrics: ShareGroupMetrics = _
+
var sharePartitionManager: SharePartitionManager = _
var persister: Persister = _
@@ -390,6 +393,9 @@ class BrokerServer(
/* create persister */
persister = createShareStatePersister()
+ /* create metrics object to be shared with share DLQ manager share
partition manager*/
+ shareGroupMetrics = new ShareGroupMetrics(time)
+
/* create share group DLQ manager */
shareGroupDLQManager = createShareGroupDLQManager()
@@ -472,6 +478,7 @@ class BrokerServer(
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
persister,
new ShareGroupConfigProvider(groupConfigManager),
+ shareGroupMetrics,
brokerTopicStats,
() =>
ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)).supportsShareGroupDLQ(),
shareGroupDLQManager
@@ -769,7 +776,8 @@ class BrokerServer(
NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config,
metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager
broker=${config.brokerId}]")),
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key =>
shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager),
Time.SYSTEM,
- shareGroupTimer
+ shareGroupTimer,
+ shareGroupMetrics
)
} else if
(klass.getName.equals(classOf[NoOpShareGroupDLQManager].getName)) {
info("Using no-op share group DLQ manager")
@@ -921,6 +929,9 @@ class BrokerServer(
if (shareGroupDLQManager != null)
Utils.swallow(this.logger.underlying, () =>
shareGroupDLQManager.stop())
+ if (shareGroupMetrics != null)
+ Utils.swallow(this.logger.underlying, () => shareGroupMetrics.close())
+
Utils.closeQuietly(shareGroupTimer, "share group timer")
if (lifecycleManager != null)
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 3bde6678921..116a92f1bc9 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1266,7 +1266,6 @@ public class SharePartitionManagerTest {
sharePartitionManager.close();
// Verify that the timer object in sharePartitionManager is closed by
checking the calls to timer.close() and shareGroupMetrics.close().
Mockito.verify(timer, times(1)).close();
- Mockito.verify(shareGroupMetrics, times(1)).close();
}
@Test
diff --git
a/server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
b/server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
index 47fed728f23..32e242b0ed1 100644
---
a/server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
+++
b/server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.server.share.dlq;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
@@ -39,14 +40,26 @@ public class DefaultShareGroupDLQManager implements
ShareGroupDLQManager {
private static final Logger log =
LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
- public static ShareGroupDLQManager instance(KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
- DefaultShareGroupDLQManager instance = new
DefaultShareGroupDLQManager(client, cacheHelper, time, timer);
+ public static ShareGroupDLQManager instance(
+ KafkaClient client,
+ ShareGroupDLQMetadataCacheHelper cacheHelper,
+ Time time,
+ Timer timer,
+ ShareGroupMetrics metrics
+ ) {
+ DefaultShareGroupDLQManager instance = new
DefaultShareGroupDLQManager(client, cacheHelper, time, timer, metrics);
instance.start();
return instance;
}
- private DefaultShareGroupDLQManager(KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
- this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper,
time, timer);
+ private DefaultShareGroupDLQManager(
+ KafkaClient client,
+ ShareGroupDLQMetadataCacheHelper cacheHelper,
+ Time time,
+ Timer timer,
+ ShareGroupMetrics shareGroupMetrics
+ ) {
+ this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper,
time, timer, shareGroupMetrics);
}
private void start() {
diff --git
a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
index 8613582ceea..b54878d7187 100644
---
a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
+++
b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.apache.kafka.server.util.timer.Timer;
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -80,6 +82,7 @@ public class ShareGroupDLQStateManager {
private final Time time;
private final Timer timer;
private final ShareGroupDLQMetadataCacheHelper cacheHelper;
+ private final ShareGroupMetrics shareGroupMetrics;
public static final long REQUEST_BACKOFF_MS = 1_000L;
public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
private static final int MAX_REQUEST_ATTEMPTS = 5;
@@ -91,7 +94,13 @@ public class ShareGroupDLQStateManager {
private final Map<Node, List<ProduceRequestHandler>> nodeRPCMap = new
HashMap<>();
private final Object nodeMapLock = new Object();
- public ShareGroupDLQStateManager(KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+ public ShareGroupDLQStateManager(
+ KafkaClient client,
+ ShareGroupDLQMetadataCacheHelper cacheHelper,
+ Time time,
+ Timer timer,
+ ShareGroupMetrics shareGroupMetrics
+ ) {
if (client == null) {
throw new IllegalArgumentException("Kafkaclient must not be
null.");
}
@@ -108,9 +117,14 @@ public class ShareGroupDLQStateManager {
throw new IllegalArgumentException("Timer must not be null.");
}
+ if (shareGroupMetrics == null) {
+ throw new IllegalArgumentException("ShareGroupMetrics must not be
null.");
+ }
+
this.time = time;
this.timer = timer;
this.cacheHelper = cacheHelper;
+ this.shareGroupMetrics = shareGroupMetrics;
this.sender = new SendThread(
"ShareGroupDLQSendThread",
client,
@@ -153,6 +167,16 @@ public class ShareGroupDLQStateManager {
return future;
}
+ // Visibility for tests
+ Map<Node, List<ShareGroupDLQStateManager.ProduceRequestHandler>>
nodeRPCMap() {
+ // Using Collections.unmodifiableMap and not Map.copyOf as we are
looking for a quick
+ // immutable view of the map in the tests. The tests will invoke the
+ // method repeatedly to check the state of the map. Map.copyOf will
create
+ // a deep copy of the map on every call and changes will might get
missed resulting
+ // in flakiness.
+ return Collections.unmodifiableMap(nodeRPCMap);
+ }
+
private void enqueue(ProduceRequestHandler requestHandler) {
sender.enqueue(requestHandler);
}
@@ -332,6 +356,10 @@ public class ShareGroupDLQStateManager {
simpleRecords.toArray(new SimpleRecord[]{})
);
+ // Update the metric to say a new request is created to se sent.
This might not be the
+ // actual RPC count as we coalesce the requests before sending.
+ shareGroupMetrics.recordDLQProduce(param.groupId());
+
return new ProduceRequestData.TopicProduceData()
.setName(dlqTopicPartitionData.topicName())
.setTopicId(dlqTopicPartitionData.topicId().get())
@@ -579,6 +607,7 @@ public class ShareGroupDLQStateManager {
switch (error) {
case NONE:
LOG.debug("Successfully produced records {} to dlq
topic node {}.", this, dlqPartitionLeaderNode());
+
shareGroupMetrics.recordDLQRecordWrite(param.groupId(), (int)
(param.lastOffset() - param.firstOffset() + 1));
produceRequestBackoff.resetAttempts();
this.result.complete(null);
break;
@@ -587,6 +616,7 @@ public class ShareGroupDLQStateManager {
LOG.debug("Received retriable error produce
response for {} to dlq topic node {} - {}.", this, dlqPartitionLeaderNode(),
errorMessage);
if (!produceRequestBackoff.canAttempt()) {
LOG.error("Exhausted max retries to produce {}
to DLQ topic node {}.", this, dlqPartitionLeaderNode());
+
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
requestErrorResponse(new Exception("Exhausted
max retries to produce to DLQ topic without success."));
break;
}
@@ -597,6 +627,7 @@ public class ShareGroupDLQStateManager {
LOG.error("Unable to produce {} to DLQ topic node
{} - {}.", this, dlqPartitionLeaderNode(), errorMessage);
partitionResponse.recordErrors().forEach(recordError ->
LOG.error("Records with errors {} - {}.",
recordError.batchIndex(), recordError.batchIndexErrorMessage()));
+
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
requestErrorResponse(error.exception());
}
break;
@@ -608,6 +639,7 @@ public class ShareGroupDLQStateManager {
if (!produceRequestBackoff.canAttempt()) {
LOG.error("Exhausted max retries to produce {} to DLQ
topic node {} due to client response error {}.",
param, dlqPartitionLeaderNode(),
clientResponseErrorMessage);
+
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
requestErrorResponse(clientResponseError.exception());
break;
}
@@ -617,6 +649,7 @@ public class ShareGroupDLQStateManager {
default:
LOG.error("Unable to produce {} to DLQ topic node {} due
to client response error {}.",
param, dlqPartitionLeaderNode(),
clientResponseErrorMessage);
+ shareGroupMetrics.recordDLQProduceFailed(param.groupId());
requestErrorResponse(clientResponseError.exception());
}
}
@@ -771,13 +804,19 @@ public class ShareGroupDLQStateManager {
}
}
- private record CoalesceResults(
+ // Visibility for tests
+ record CoalesceResults(
AbstractRequest.Builder<? extends AbstractRequest> request,
List<ProduceRequestHandler> liveHandlers
) {
}
- private static CoalesceResults
coalesceProduceRequests(List<ProduceRequestHandler> handlers) {
+ // Visibility for tests
+ static CoalesceResults coalesceProduceRequests(List<ProduceRequestHandler>
handlers) {
+ // Above handlers are destined for the same broker node - it could be
for different DLQ topics and partitions
+ // but the same broker node. Now the produce request requires each
topic data request to be
+ // scoped to a specific topic/topicId and the partition data could
have all the record information
+ // and the destination DLQ partition. To accomplish this, we will map
handlers by DLQ topic id.
Map<Uuid, ProduceRequestData.TopicProduceData> produceHandlerMap = new
HashMap<>();
List<ProduceRequestHandler> liveHandlers = new
ArrayList<>(handlers.size());
handlers.forEach(handler -> {
diff --git
a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
index a6d939003ce..eac929140cd 100644
---
a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
@@ -37,24 +37,31 @@ import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import
org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper.TopicPartitionData;
+import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -73,7 +80,15 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
class ShareGroupDLQStateManagerTest {
@@ -87,6 +102,7 @@ class ShareGroupDLQStateManagerTest {
private static final Node DEFAULT_LEADER = new Node(0, HOST, PORT);
private final MockTimer mockTimer = new MockTimer(MOCK_TIME);
+ private final ShareGroupMetrics mockMetrics =
mock(ShareGroupMetrics.class);
private ShareGroupDLQStateManager stateManager;
@AfterEach
@@ -101,6 +117,7 @@ class ShareGroupDLQStateManagerTest {
private Time time = MOCK_TIME;
private Timer timer;
private ShareGroupDLQMetadataCacheHelper cacheHelper;
+ private ShareGroupMetrics shareGroupMetrics;
Builder withClient(KafkaClient client) {
this.client = client;
@@ -122,12 +139,20 @@ class ShareGroupDLQStateManagerTest {
return this;
}
+ Builder withShareGroupMetrics(ShareGroupMetrics shareGroupMetrics) {
+ this.shareGroupMetrics = shareGroupMetrics;
+ return this;
+ }
+
ShareGroupDLQStateManager build() {
+ // Default to the test-class mockMetrics field so tests can verify
interactions
+ // without having to thread a custom metrics mock through the
builder.
return new ShareGroupDLQStateManager(
client != null ? client : new MockClient(MOCK_TIME),
cacheHelper != null ? cacheHelper :
happyCacheHelper(DEFAULT_LEADER),
time,
- timer != null ? timer : mockTimer
+ timer != null ? timer : mockTimer,
+ shareGroupMetrics != null ? shareGroupMetrics : mockMetrics
);
}
}
@@ -233,14 +258,14 @@ class ShareGroupDLQStateManagerTest {
public void testConstructorRejectsNullClient() {
ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
assertThrows(IllegalArgumentException.class,
- () -> new ShareGroupDLQStateManager(null, cacheHelper, MOCK_TIME,
mockTimer));
+ () -> new ShareGroupDLQStateManager(null, cacheHelper, MOCK_TIME,
mockTimer, mockMetrics));
}
@Test
public void testConstructorRejectsNullCacheHelper() {
KafkaClient client = mock(KafkaClient.class);
assertThrows(IllegalArgumentException.class,
- () -> new ShareGroupDLQStateManager(client, null, MOCK_TIME,
mockTimer));
+ () -> new ShareGroupDLQStateManager(client, null, MOCK_TIME,
mockTimer, mockMetrics));
}
@Test
@@ -248,7 +273,7 @@ class ShareGroupDLQStateManagerTest {
KafkaClient client = mock(KafkaClient.class);
ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
assertThrows(IllegalArgumentException.class,
- () -> new ShareGroupDLQStateManager(client, cacheHelper, null,
mockTimer));
+ () -> new ShareGroupDLQStateManager(client, cacheHelper, null,
mockTimer, mockMetrics));
}
@Test
@@ -256,7 +281,15 @@ class ShareGroupDLQStateManagerTest {
KafkaClient client = mock(KafkaClient.class);
ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
assertThrows(IllegalArgumentException.class,
- () -> new ShareGroupDLQStateManager(client, cacheHelper,
MOCK_TIME, null));
+ () -> new ShareGroupDLQStateManager(client, cacheHelper,
MOCK_TIME, null, mockMetrics));
+ }
+
+ @Test
+ public void testConstructorRejectsNullShareGroupMetrics() {
+ KafkaClient client = mock(KafkaClient.class);
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+ assertThrows(IllegalArgumentException.class,
+ () -> new ShareGroupDLQStateManager(client, cacheHelper,
MOCK_TIME, mockTimer, null));
}
// ---- Lifecycle tests ----
@@ -268,12 +301,14 @@ class ShareGroupDLQStateManagerTest {
stateManager.start();
stateManager.start();
// tearDown will call stateManager.stop() and must not throw.
+ verifyNoInteractions(mockMetrics);
}
@Test
public void testStopWithoutStartIsNoOp() {
stateManager = builder().build();
// tearDown will call stateManager.stop() without a prior start() and
must not throw.
+ verifyNoInteractions(mockMetrics);
}
// ---- DLQ topic validation tests (no thread start required) ----
@@ -288,6 +323,7 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertInstanceOf(ConfigException.class, cause);
assertTrue(cause.getMessage().contains("empty"));
+ verifyNoInteractions(mockMetrics);
}
@Test
@@ -300,6 +336,7 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertInstanceOf(ConfigException.class, cause);
assertTrue(cause.getMessage().contains("__"));
+ verifyNoInteractions(mockMetrics);
}
@Test
@@ -314,6 +351,7 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertInstanceOf(ConfigException.class, cause);
assertTrue(cause.getMessage().contains("DLQ is not enabled"));
+ verifyNoInteractions(mockMetrics);
}
@Test
@@ -328,6 +366,7 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertInstanceOf(ConfigException.class, cause);
assertTrue(cause.getMessage().contains("auto create is disabled"));
+ verifyNoInteractions(mockMetrics);
}
@Test
@@ -342,6 +381,7 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertInstanceOf(ConfigException.class, cause);
assertTrue(cause.getMessage().contains("does not comply with the DLQ
topic prefix"));
+ verifyNoInteractions(mockMetrics);
}
@Test
@@ -356,6 +396,7 @@ class ShareGroupDLQStateManagerTest {
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFalse(result.isCancelled());
+ verifyNoInteractions(mockMetrics);
}
// ---- Full integration tests ----
@@ -390,6 +431,9 @@ class ShareGroupDLQStateManagerTest {
HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
))
));
+ verify(mockMetrics).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
}
@Test
@@ -440,6 +484,9 @@ class ShareGroupDLQStateManagerTest {
HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
))
));
+ verify(mockMetrics).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
}
@Test
@@ -495,6 +542,9 @@ class ShareGroupDLQStateManagerTest {
HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
))
));
+ verify(mockMetrics).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
}
@Test
@@ -521,6 +571,8 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertNotNull(cause);
assertEquals(Errors.INVALID_REPLICATION_FACTOR.exception().getClass(),
cause.getClass());
+ // CreateTopics failed; produce was never attempted, so no DLQ metrics
should fire.
+ verifyNoInteractions(mockMetrics);
}
@Test
@@ -537,6 +589,8 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertNotNull(cause);
assertEquals(Errors.BROKER_NOT_AVAILABLE.exception().getClass(),
cause.getClass());
+ // No cluster node was available to send CreateTopics to; produce
never attempted.
+ verifyNoInteractions(mockMetrics);
}
@Test
@@ -574,6 +628,8 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L,
maxAttempts));
assertNotNull(cause);
assertEquals(Errors.NETWORK_EXCEPTION.exception().getClass(),
cause.getClass());
+ // CreateTopics retries exhausted; produce never attempted, so no
DLQ metrics should fire.
+ verifyNoInteractions(mockMetrics);
} finally {
Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
}
@@ -641,6 +697,10 @@ class ShareGroupDLQStateManagerTest {
HEADER_DLQ_ERRORS_MESSAGE, "simulated cause"
))
));
+ // CreateTopics retried, but produce only ran once (after the
eventual create success).
+ verify(mockMetrics).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
} finally {
Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
}
@@ -688,6 +748,11 @@ class ShareGroupDLQStateManagerTest {
for (ProduceRequest pr : capturedProduces) {
assertDlqProduceRecordHeaders(pr, expectedByPartition);
}
+ // Each attempt (including retries) goes through generateRequests,
so recordDLQProduce
+ // is invoked once per attempt. recordDLQRecordWrite fires only on
the final success.
+ verify(mockMetrics, times(maxAttempts)).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3);
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
} finally {
Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
}
@@ -707,6 +772,9 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertNotNull(cause);
assertEquals(Errors.INVALID_TOPIC_EXCEPTION.exception().getClass(),
cause.getClass());
+ verify(mockMetrics).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQProduceFailed(GROUP_ID);
+ verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
}
@Test
@@ -723,6 +791,11 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param()));
assertNotNull(cause);
assertEquals(Errors.UNKNOWN_SERVER_ERROR.exception().getClass(),
cause.getClass());
+ verify(mockMetrics).recordDLQProduce(GROUP_ID);
+ // Empty produce-response paths return UNKNOWN_SERVER_ERROR via
requestErrorResponse without
+ // touching recordDLQProduceFailed (that's only invoked from the
inner/outer default cases).
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
+ verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
}
@Test
@@ -751,6 +824,11 @@ class ShareGroupDLQStateManagerTest {
} catch (TimeoutException expected) {
assertFalse(result.isDone());
}
+ // The first attempt did go out to the wire, so recordDLQProduce fired
once. The retry is
+ // still pending in the timer, so neither success nor failure metrics
should have landed.
+ verify(mockMetrics, times(1)).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
}
@Test
@@ -781,6 +859,53 @@ class ShareGroupDLQStateManagerTest {
Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L,
maxAttempts));
assertNotNull(cause);
assertEquals(Errors.NETWORK_EXCEPTION.exception().getClass(),
cause.getClass());
+ // NETWORK_EXCEPTION exhaustion now records the failure metric
(production records it
+ // on the canAttempt()==false branch before requestErrorResponse).
Only the final
+ // exhaustion records the failure - the earlier retried attempts
don't.
+ verify(mockMetrics, times(maxAttempts)).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQProduceFailed(GROUP_ID);
+ verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
+ } finally {
+ Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
+ }
+ }
+
+ @Test
+ public void testDlqProduceNotLeaderOrFollowerExhaustsAndRecordsFailure()
throws Exception {
+ int maxAttempts = 3;
+ // Real timer with tiny backoffs keeps the exhaustion path within
milliseconds.
+ Timer realTimer = new SystemTimerReaper("shareGroupDLQTestTimer",
+ new SystemTimer("shareGroupDLQTestTimer"));
+ try {
+ MockClient client = new MockClient(MOCK_TIME);
+ // Each attempt returns a partition-level NOT_LEADER_OR_FOLLOWER
on partition 0,
+ // which is retriable up to maxAttempts.
+ for (int i = 0; i < maxAttempts; i++) {
+ client.prepareResponseFrom(
+ body -> body instanceof ProduceRequest,
+ produceResponseWithError(Errors.NOT_LEADER_OR_FOLLOWER),
+ DEFAULT_LEADER
+ );
+ }
+
+ stateManager = builder()
+ .withClient(client)
+ .withTimer(realTimer)
+ .build();
+ stateManager.start();
+
+ Throwable cause = getCause(stateManager.dlq(param(), 1L, 5L,
maxAttempts));
+ assertNotNull(cause);
+ // The exhaustion path raises a generic Exception (not
Errors.NOT_LEADER_OR_FOLLOWER's
+ // typed exception), so just check the message rather than the
class.
+ assertTrue(cause.getMessage().contains("Exhausted max retries"));
+
+ // Each attempt is its own outgoing produce request ->
recordDLQProduce fires
+ // maxAttempts times. The final attempt exhausts retries and
records the failure
+ // (the new inner NOT_LEADER_OR_FOLLOWER exhaustion branch).
+ verify(mockMetrics, times(maxAttempts)).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQProduceFailed(GROUP_ID);
+ verify(mockMetrics, never()).recordDLQRecordWrite(any(), anyInt());
} finally {
Utils.closeQuietly(realTimer, "shareGroupDLQTestTimer");
}
@@ -887,6 +1012,13 @@ class ShareGroupDLQStateManagerTest {
assertDlqProduceRecordHeaders(pr, Map.of(dlqPartitionIndex,
expected));
}
}
+ // Each handler succeeds with 1 record (offsets 0..0).
recordDLQProduce is now invoked
+ // once per handler inside topicProduceData() (not deduped
per-group-per-request as it
+ // was previously in coalesceProduceRequests), so two handlers always
produce two calls
+ // regardless of whether the SendThread coalesces them into a single
produce request.
+ verify(mockMetrics, times(2)).recordDLQRecordWrite(GROUP_ID, 1);
+ verify(mockMetrics, times(2)).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
}
@Test
@@ -925,8 +1057,358 @@ class ShareGroupDLQStateManagerTest {
HEADER_DLQ_ERRORS_GROUP, GROUP_ID
))
));
+ verify(mockMetrics).recordDLQProduce(GROUP_ID);
+ verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 1);
+ verify(mockMetrics, never()).recordDLQProduceFailed(any());
}
+ @Test
+ public void testDlqMultipleGroupsWithMixedOutcomes() throws Exception {
+ String groupA = "group-a";
+ String groupB = "group-b";
+ String groupC = "group-c";
+
+ // All three groups share the same DLQ topic with 3 partitions on the
same leader, so
+ // partition 0 -> group-a, partition 1 -> group-b, partition 2 ->
group-c (via the
+ // sourcePartition % numPartitions mapping in populateDLQTopicData).
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(anyString())).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+
when(cacheHelper.getClusterNodes()).thenReturn(List.of(DEFAULT_LEADER));
+
when(cacheHelper.topicName(SOURCE_TOPIC_ID)).thenReturn(Optional.of("source-topic"));
+ when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new
TopicPartitionData(
+ DLQ_TOPIC,
+ Optional.of(3),
+ Optional.of(DLQ_TOPIC_ID),
+ List.of(DEFAULT_LEADER, DEFAULT_LEADER, DEFAULT_LEADER)
+ ));
+
+ // Each prepared produce response carries partition 0=NONE,
1=INVALID_TOPIC_EXCEPTION,
+ // 2=NONE so that whichever physical request lands at the broker - one
fully coalesced
+ // request, two requests, or three separate requests, depending on
SendThread/inFlight
+ // timing - each handler still finds its own destination partition in
the response.
+ ProduceResponseData.TopicProduceResponse topicResp = new
ProduceResponseData.TopicProduceResponse()
+ .setTopicId(DLQ_TOPIC_ID)
+ .setPartitionResponses(List.of(
+ new ProduceResponseData.PartitionProduceResponse()
+ .setIndex(0)
+ .setErrorCode(Errors.NONE.code()),
+ new ProduceResponseData.PartitionProduceResponse()
+ .setIndex(1)
+ .setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code())
+ .setErrorMessage(Errors.INVALID_TOPIC_EXCEPTION.message()),
+ new ProduceResponseData.PartitionProduceResponse()
+ .setIndex(2)
+ .setErrorCode(Errors.NONE.code())
+ ));
+ ProduceResponseData.TopicProduceResponseCollection collection =
+ new ProduceResponseData.TopicProduceResponseCollection();
+ collection.add(topicResp);
+ ProduceResponse mixedResp = new ProduceResponse(new
ProduceResponseData().setResponses(collection));
+
+ MockClient client = new MockClient(MOCK_TIME);
+ // Over-prepare: there may be 1, 2, or 3 outgoing produce requests
depending on how the
+ // SendThread coalesces. Unused prepared responses are harmless.
+ for (int i = 0; i < 3; i++) {
+ client.prepareResponseFrom(body -> body instanceof ProduceRequest,
mixedResp, DEFAULT_LEADER);
+ }
+
+ stateManager =
builder().withClient(client).withCacheHelper(cacheHelper).build();
+ stateManager.start();
+
+ ShareGroupDLQRecordParameter pA = new ShareGroupDLQRecordParameter(
+ groupA,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false);
+ ShareGroupDLQRecordParameter pB = new ShareGroupDLQRecordParameter(
+ groupB,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 1, "source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false);
+ ShareGroupDLQRecordParameter pC = new ShareGroupDLQRecordParameter(
+ groupC,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 2, "source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false);
+
+ CompletableFuture<Void> rA = stateManager.dlq(pA);
+ CompletableFuture<Void> rB = stateManager.dlq(pB);
+ CompletableFuture<Void> rC = stateManager.dlq(pC);
+
+ assertNull(rA.get(5, TimeUnit.SECONDS));
+ Throwable causeB = getCause(rB);
+ assertEquals(Errors.INVALID_TOPIC_EXCEPTION.exception().getClass(),
causeB.getClass());
+ assertNull(rC.get(5, TimeUnit.SECONDS));
+
+ // recordDLQProduce is invoked once per handler inside
topicProduceData(). Each group
+ // has a single handler that runs through exactly one attempt (no
retries on this path),
+ // so each group sees exactly one metric call regardless of how the
SendThread coalesces.
+ verify(mockMetrics).recordDLQProduce(groupA);
+ verify(mockMetrics).recordDLQProduce(groupB);
+ verify(mockMetrics).recordDLQProduce(groupC);
+
+ // Per-group write count records only for the successful groups.
+ verify(mockMetrics).recordDLQRecordWrite(groupA, 1);
+ verify(mockMetrics).recordDLQRecordWrite(groupC, 1);
+ verify(mockMetrics, never()).recordDLQRecordWrite(eq(groupB),
anyInt());
+
+ // Failure metric fires only for groupB (the INVALID_TOPIC_EXCEPTION
partition). Production
+ // now records the metric before completing the future, so no
timeout-bridge is needed.
+ verify(mockMetrics).recordDLQProduceFailed(groupB);
+ verify(mockMetrics, never()).recordDLQProduceFailed(groupA);
+ verify(mockMetrics, never()).recordDLQProduceFailed(groupC);
+
+ // Aggregate sanity check: total recordDLQProduce calls (3) strictly
exceeds total
+ // recordDLQProduceFailed calls (1), demonstrating that "some failed
and failed < total".
+ long produceCount =
Mockito.mockingDetails(mockMetrics).getInvocations().stream()
+ .filter(inv ->
inv.getMethod().getName().equals("recordDLQProduce"))
+ .count();
+ long produceFailedCount =
Mockito.mockingDetails(mockMetrics).getInvocations().stream()
+ .filter(inv ->
inv.getMethod().getName().equals("recordDLQProduceFailed"))
+ .count();
+ assertEquals(3, produceCount);
+ assertEquals(1, produceFailedCount);
+ assertTrue(produceFailedCount < produceCount,
+ "Expected recordDLQProduceFailed count (" + produceFailedCount +
") < recordDLQProduce count (" + produceCount + ")");
+ }
+
+ @Test
+ public void testMultipleAccumulatedHandlersInNodeRPCMap() throws Exception
{
+ MockClient client = new MockClient(MOCK_TIME);
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+
+ client.prepareResponseFrom(body -> true, null, DEFAULT_LEADER);
+
+ stateManager =
builder().withClient(client).withCacheHelper(happyCacheHelper(DEFAULT_LEADER)).build();
+
+ Future<Boolean> done = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <=
TestUtils.DEFAULT_MAX_WAIT_MS) { // keep checking for a few secs
+ List<ShareGroupDLQStateManager.ProduceRequestHandler> handlers
= stateManager.nodeRPCMap().get(DEFAULT_LEADER);
+ if (handlers != null && handlers.size() > 2) {
+ return true;
+ }
+ }
+ return false;
+ });
+
+ stateManager.start();
+
+ // Multiple dlq() calls for the same group. They all target the same
leader (DEFAULT_LEADER),
+ // so after the first iteration marks that node as in-flight, the rest
accumulate in
+ // nodeRPCMap and never get cleared.
+ for (int i = 0; i < 10; i++) {
+ stateManager.dlq(new ShareGroupDLQRecordParameter(
+ GROUP_ID,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false));
+ }
+
+ // Wait until the callback observes nodeRPCMap with more than 2
handlers piled up.
+ TestUtils.waitForCondition(done::get, TestUtils.DEFAULT_MAX_WAIT_MS,
10L, () -> {
+ executor.shutdown();
+ return "unable to verify batching";
+ });
+ executor.shutdown();
+ }
+
+ // ---- Direct unit tests for coalesceProduceRequests ----
+
+ @Test
+ public void
testCoalesceProduceRequestsWithEmptyHandlerListProducesEmptyRequest() {
+ ShareGroupDLQStateManager.CoalesceResults result =
+ ShareGroupDLQStateManager.coalesceProduceRequests(List.of());
+
+ assertTrue(result.liveHandlers().isEmpty());
+ ProduceRequest request = ((ProduceRequest.Builder)
result.request()).build();
+ assertTrue(request.data().topicData().isEmpty());
+ }
+
+ @Test
+ public void
testCoalesceProduceRequestsWithSingleHandlerProducesOneTopicOnePartition()
throws Exception {
+ stateManager = builder().build();
+ ShareGroupDLQStateManager.ProduceRequestHandler handler =
+ newHandlerForCoalesceTest(stateManager, GROUP_ID, 0);
+ handler.populateDLQTopicData();
+
+ ShareGroupDLQStateManager.CoalesceResults result =
+
ShareGroupDLQStateManager.coalesceProduceRequests(List.of(handler));
+
+ assertEquals(List.of(handler), result.liveHandlers());
+
+ ProduceRequest request = ((ProduceRequest.Builder)
result.request()).build();
+ assertEquals(1, request.data().topicData().size());
+ ProduceRequestData.TopicProduceData topic =
request.data().topicData().iterator().next();
+ assertEquals(DLQ_TOPIC_ID, topic.topicId());
+ assertEquals(1, topic.partitionData().size());
+ assertEquals(0, topic.partitionData().get(0).index());
+
+ // topicProduceData() fires recordDLQProduce once per handler as a
side effect.
+ verify(mockMetrics).recordDLQProduce(GROUP_ID);
+ }
+
+ @Test
+ public void testCoalesceProduceRequestsMergesPartitionsForSameDlqTopic()
throws Exception {
+ // Cache helper exposes a single DLQ topic with 2 partitions on the
same leader so that
+ // two handlers (source partitions 0 and 1) map to DLQ partitions 0
and 1 respectively.
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(anyString())).thenReturn(Optional.of(DLQ_TOPIC));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.containsTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqEnabledOnTopic(DLQ_TOPIC)).thenReturn(true);
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+ when(cacheHelper.topicPartitionData(DLQ_TOPIC)).thenReturn(new
TopicPartitionData(
+ DLQ_TOPIC,
+ Optional.of(2),
+ Optional.of(DLQ_TOPIC_ID),
+ List.of(DEFAULT_LEADER, DEFAULT_LEADER)
+ ));
+
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ ShareGroupDLQStateManager.ProduceRequestHandler h0 =
+ newHandlerForCoalesceTest(stateManager, GROUP_ID, 0);
+ ShareGroupDLQStateManager.ProduceRequestHandler h1 =
+ newHandlerForCoalesceTest(stateManager, GROUP_ID, 1);
+ h0.populateDLQTopicData();
+ h1.populateDLQTopicData();
+
+ ShareGroupDLQStateManager.CoalesceResults result =
+ ShareGroupDLQStateManager.coalesceProduceRequests(List.of(h0, h1));
+
+ assertEquals(List.of(h0, h1), result.liveHandlers());
+
+ ProduceRequest request = ((ProduceRequest.Builder)
result.request()).build();
+ assertEquals(1, request.data().topicData().size(),
+ "Both handlers share a DLQ topic so they should coalesce into a
single topic entry");
+ ProduceRequestData.TopicProduceData topic =
request.data().topicData().iterator().next();
+ assertEquals(DLQ_TOPIC_ID, topic.topicId());
+ Set<Integer> partitionIndices = topic.partitionData().stream()
+ .map(ProduceRequestData.PartitionProduceData::index)
+ .collect(Collectors.toSet());
+ assertEquals(Set.of(0, 1), partitionIndices);
+
+ // recordDLQProduce fires once per handler (the metric lives in
topicProduceData()).
+ verify(mockMetrics, times(2)).recordDLQProduce(GROUP_ID);
+ }
+
+ @Test
+ public void testCoalesceProduceRequestsKeepsDifferentDlqTopicsSeparate()
throws Exception {
+ String groupA = "group-a";
+ String groupB = "group-b";
+ String dlqTopicA = "dlq-topic-a";
+ String dlqTopicB = "dlq-topic-b";
+ Uuid dlqTopicAId = Uuid.randomUuid();
+ Uuid dlqTopicBId = Uuid.randomUuid();
+
+ ShareGroupDLQMetadataCacheHelper cacheHelper =
mock(ShareGroupDLQMetadataCacheHelper.class);
+
when(cacheHelper.shareGroupDlqTopic(groupA)).thenReturn(Optional.of(dlqTopicA));
+
when(cacheHelper.shareGroupDlqTopic(groupB)).thenReturn(Optional.of(dlqTopicB));
+
when(cacheHelper.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+ when(cacheHelper.containsTopic(dlqTopicA)).thenReturn(true);
+ when(cacheHelper.containsTopic(dlqTopicB)).thenReturn(true);
+ when(cacheHelper.isDlqEnabledOnTopic(anyString())).thenReturn(true);
+ when(cacheHelper.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+ when(cacheHelper.topicPartitionData(dlqTopicA)).thenReturn(new
TopicPartitionData(
+ dlqTopicA, Optional.of(1), Optional.of(dlqTopicAId),
List.of(DEFAULT_LEADER)));
+ when(cacheHelper.topicPartitionData(dlqTopicB)).thenReturn(new
TopicPartitionData(
+ dlqTopicB, Optional.of(1), Optional.of(dlqTopicBId),
List.of(DEFAULT_LEADER)));
+
+ stateManager = builder().withCacheHelper(cacheHelper).build();
+ ShareGroupDLQStateManager.ProduceRequestHandler hA =
+ newHandlerForCoalesceTest(stateManager, groupA, 0);
+ ShareGroupDLQStateManager.ProduceRequestHandler hB =
+ newHandlerForCoalesceTest(stateManager, groupB, 0);
+ hA.populateDLQTopicData();
+ hB.populateDLQTopicData();
+
+ ShareGroupDLQStateManager.CoalesceResults result =
+ ShareGroupDLQStateManager.coalesceProduceRequests(List.of(hA, hB));
+
+ assertEquals(List.of(hA, hB), result.liveHandlers());
+
+ ProduceRequest request = ((ProduceRequest.Builder)
result.request()).build();
+ assertEquals(2, request.data().topicData().size(),
+ "Different DLQ topic ids must remain in separate TopicProduceData
entries");
+ Set<Uuid> topicIds = new HashSet<>();
+ request.data().topicData().forEach(topic ->
topicIds.add(topic.topicId()));
+ assertEquals(Set.of(dlqTopicAId, dlqTopicBId), topicIds);
+
+ verify(mockMetrics).recordDLQProduce(groupA);
+ verify(mockMetrics).recordDLQProduce(groupB);
+ }
+
+ @Test
+ public void
testCoalesceProduceRequestsSkipsHandlerWhoseTopicProduceDataThrows() throws
Exception {
+ stateManager = builder().build();
+
+ // Good handler: populateDLQTopicData() has been called, so
topicProduceData() succeeds.
+ CompletableFuture<Void> goodFuture = new CompletableFuture<>();
+ ShareGroupDLQStateManager.ProduceRequestHandler good =
stateManager.new ProduceRequestHandler(
+ new ShareGroupDLQRecordParameter(GROUP_ID,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false),
+ goodFuture,
+ ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
+ ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
+ 3);
+ good.populateDLQTopicData();
+
+ // Broken handler: populateDLQTopicData() was never called, so
dlqTopicPartitionData is
+ // null and topicProduceData() will NPE. coalesceProduceRequests must
catch, call
+ // requestErrorResponse to fail the future, and drop the handler from
liveHandlers.
+ CompletableFuture<Void> brokenFuture = new CompletableFuture<>();
+ ShareGroupDLQStateManager.ProduceRequestHandler broken =
stateManager.new ProduceRequestHandler(
+ new ShareGroupDLQRecordParameter(GROUP_ID,
+ new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false),
+ brokenFuture,
+ ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
+ ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
+ 3);
+
+ ShareGroupDLQStateManager.CoalesceResults result =
+ ShareGroupDLQStateManager.coalesceProduceRequests(List.of(good,
broken));
+
+ assertEquals(List.of(good), result.liveHandlers(),
+ "Only the handler whose topicProduceData() succeeded should appear
in liveHandlers");
+ assertFalse(goodFuture.isDone(), "Good handler's future must be
untouched by coalesce");
+ assertTrue(brokenFuture.isCompletedExceptionally(),
+ "Broken handler's future must be completed exceptionally");
+ Throwable cause = getCause(brokenFuture);
+ assertInstanceOf(NullPointerException.class, cause);
+
+ // The resulting request still contains the surviving handler's data.
+ ProduceRequest request = ((ProduceRequest.Builder)
result.request()).build();
+ assertEquals(1, request.data().topicData().size());
+ assertEquals(DLQ_TOPIC_ID,
request.data().topicData().iterator().next().topicId());
+ }
+
+ private static ShareGroupDLQStateManager.ProduceRequestHandler
newHandlerForCoalesceTest(
+ ShareGroupDLQStateManager manager,
+ String groupId,
+ int sourcePartition
+ ) {
+ ShareGroupDLQRecordParameter param = new ShareGroupDLQRecordParameter(
+ groupId,
+ new TopicIdPartition(SOURCE_TOPIC_ID, sourcePartition,
"source-topic"),
+ 0L, 0L,
+ Optional.empty(), Optional.empty(), false);
+ return manager.new ProduceRequestHandler(
+ param,
+ new CompletableFuture<>(),
+ ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
+ ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
+ 3);
+ }
+
+
// ---- Response builder helpers ----
private static ProduceResponse successfulProduceResponse(int partition) {