This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ebec5cea521 [feat] PIP-468: V5 dead letter queue with scalable DLQ
topic (#25652)
ebec5cea521 is described below
commit ebec5cea521cde25097e8b77fa1b8fb9dee63032
Author: Matteo Merli <[email protected]>
AuthorDate: Sat May 2 09:27:47 2026 -0700
[feat] PIP-468: V5 dead letter queue with scalable DLQ topic (#25652)
---
.../client/api/v5/V5DeadLetterPolicyTest.java | 197 +++++++++++--
.../client/impl/v5/MultiTopicQueueConsumer.java | 2 +-
.../client/impl/v5/QueueConsumerBuilderV5.java | 25 +-
.../client/impl/v5/ScalableQueueConsumer.java | 124 +++++++-
.../client/impl/v5/ScalableTopicProducer.java | 314 +++++++++++++--------
5 files changed, 505 insertions(+), 157 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
index be8dfd7f808..11f5cd0b462 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
@@ -20,31 +20,34 @@ package org.apache.pulsar.client.api.v5;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import java.time.Duration;
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import lombok.Cleanup;
import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.util.RetryMessageUtil;
import org.testng.annotations.Test;
/**
* Coverage for {@link
QueueConsumerBuilder#deadLetterPolicy(DeadLetterPolicy)}: after
- * {@code maxRedeliverCount} negative-acks, the broker forwards the message to
the
+ * {@code maxRedeliverCount} negative-acks, the V5 layer forwards the message
to the
* configured dead-letter topic.
*
- * <p><b>Known V5 gap:</b> the DLQ topic is currently a non-scalable
persistent topic.
- * The V5 source consumer's underlying v4 {@code ConsumerImpl} creates the DLQ
producer
- * via {@code client.newProducer(...)}, which rejects {@code topic://}
scalable topic
- * names. Routing the DLQ producer through V5's segment-bypass path is
required before
- * a scalable DLQ can be used here.
+ * <p>V5 owns the DLQ at the consumer layer (not per-segment), so the DLQ
topic is
+ * itself a scalable {@code topic://} topic and a single shared producer fans
messages
+ * from every segment into it.
*/
public class V5DeadLetterPolicyTest extends V5ClientBaseTest {
@Test
- public void testMessageGoesToDlqAfterMaxRedeliveries() throws Exception {
+ public void testMessageGoesToScalableDlqWhenExplicitlyConfigured() throws
Exception {
String topic = newScalableTopic(1);
- // Non-scalable DLQ topic — see class-level note about the V5 gap.
- String dlqTopic = "persistent://" + getNamespace() + "/dlq-explicit";
+ String dlqTopic = newScalableTopic(1);
@Cleanup
Producer<String> producer = v5Client.newProducer(Schema.string())
@@ -59,14 +62,10 @@ public class V5DeadLetterPolicyTest extends
V5ClientBaseTest {
.deadLetterPolicy(new DeadLetterPolicy(2, null, dlqTopic,
null))
.subscribe();
- // Subscribe to the DLQ via the V4 client (DLQ is a regular persistent
topic).
@Cleanup
- org.apache.pulsar.client.api.Consumer<String> dlqConsumer =
pulsarClient
- .newConsumer(org.apache.pulsar.client.api.Schema.STRING)
+ QueueConsumer<byte[]> dlqConsumer =
v5Client.newQueueConsumer(Schema.bytes())
.topic(dlqTopic)
.subscriptionName("dlq-watcher")
- .subscriptionInitialPosition(
-
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
.subscribe();
producer.newMessage().value("dead").send();
@@ -81,11 +80,171 @@ public class V5DeadLetterPolicyTest extends
V5ClientBaseTest {
consumer.negativeAcknowledge(msg.id());
}
- // Now it should appear on the DLQ topic.
- org.apache.pulsar.client.api.Message<String> dlqMsg =
- dlqConsumer.receive(10, java.util.concurrent.TimeUnit.SECONDS);
+ Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
assertNotNull(dlqMsg, "message did not land on the DLQ topic");
- assertEquals(dlqMsg.getValue(), "dead");
- dlqConsumer.acknowledge(dlqMsg);
+ assertEquals(new String(dlqMsg.value()), "dead");
+ dlqConsumer.acknowledge(dlqMsg.id());
+ }
+
+ @Test
+ public void testMessageGoesToDefaultScalableDlqTopic() throws Exception {
+ String topic = newScalableTopic(1);
+ // When deadLetterTopic is null, V5 defaults to
topic://<tenant>/<ns>/<source-local>-DLQ.
+ // Pre-create it so the V5 producer's layout lookup succeeds.
+ String defaultDlqTopic = topic +
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+ admin.scalableTopics().createScalableTopic(defaultDlqTopic, 1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("default-dlq-sub")
+ .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+ Duration.ofMillis(200), Duration.ofMillis(200)))
+ .deadLetterPolicy(DeadLetterPolicy.of(1))
+ .subscribe();
+
+ @Cleanup
+ QueueConsumer<byte[]> dlqConsumer =
v5Client.newQueueConsumer(Schema.bytes())
+ .topic(defaultDlqTopic)
+ .subscriptionName("dlq-watcher")
+ .subscribe();
+
+ producer.newMessage().value("dead-default").send();
+
+ // maxRedeliverCount=1 → user sees deliveries with counts 0 and 1, the
+ // third delivery (count=2) is intercepted.
+ for (int i = 0; i < 2; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected delivery #" + (i + 1));
+ assertEquals(msg.value(), "dead-default");
+ consumer.negativeAcknowledge(msg.id());
+ }
+
+ Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
+ assertNotNull(dlqMsg, "message did not land on the default DLQ topic");
+ assertEquals(new String(dlqMsg.value()), "dead-default");
+ dlqConsumer.acknowledge(dlqMsg.id());
+ }
+
+ /**
+ * The V5 DLQ producer must preserve message key, user properties, and
event time,
+ * and attach origin metadata (REAL_TOPIC / REAL_SUBSCRIPTION /
ORIGIN_MESSAGE_ID)
+ * so DLQ consumers can correlate back to the source delivery.
+ */
+ @Test
+ public void testDlqMessagePreservesKeyPropertiesAndOriginMetadata() throws
Exception {
+ String topic = newScalableTopic(1);
+ String dlqTopic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dlq-meta-sub")
+ .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+ Duration.ofMillis(200), Duration.ofMillis(200)))
+ .deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic,
null))
+ .subscribe();
+ @Cleanup
+ QueueConsumer<byte[]> dlqConsumer =
v5Client.newQueueConsumer(Schema.bytes())
+ .topic(dlqTopic)
+ .subscriptionName("dlq-meta-watcher")
+ .subscribe();
+
+ long eventTime = System.currentTimeMillis();
+ producer.newMessage()
+ .key("k-42")
+ .value("payload")
+ .eventTime(Instant.ofEpochMilli(eventTime))
+ .property("user-prop", "user-val")
+ .send();
+
+ for (int i = 0; i < 2; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ consumer.negativeAcknowledge(msg.id());
+ }
+
+ Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
+ assertNotNull(dlqMsg, "message did not land on the DLQ topic");
+ assertEquals(new String(dlqMsg.value()), "payload");
+ assertEquals(dlqMsg.key().orElse(null), "k-42");
+
assertEquals(dlqMsg.eventTime().map(Instant::toEpochMilli).orElse(-1L).longValue(),
+ eventTime);
+
+ Map<String, String> props = dlqMsg.properties();
+ assertEquals(props.get("user-prop"), "user-val", "user properties must
be preserved");
+ assertEquals(props.get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC),
topic);
+
assertEquals(props.get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION),
"dlq-meta-sub");
+ assertNotNull(props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID),
+ "origin message id must be attached");
+
assertTrue(props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID).matches("\\d+:\\d+(:.+)?"),
+ "origin message id should look like a v4 message id, got: "
+ +
props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID));
+
+ dlqConsumer.acknowledge(dlqMsg.id());
+ }
+
+ /**
+ * Verifies a single V5-side DLQ producer handles forwarding from messages
that
+ * arrived on different source segments — i.e., the V5 layer (not the v4
+ * per-segment {@code ConsumerImpl}) owns the DLQ producer.
+ */
+ @Test
+ public void testDlqAcrossMultipleSourceSegments() throws Exception {
+ String topic = newScalableTopic(3);
+ String dlqTopic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dlq-multi-sub")
+ .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+ Duration.ofMillis(200), Duration.ofMillis(200)))
+ .deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic,
null))
+ .subscribe();
+ @Cleanup
+ QueueConsumer<byte[]> dlqConsumer =
v5Client.newQueueConsumer(Schema.bytes())
+ .topic(dlqTopic)
+ .subscriptionName("dlq-multi-watcher")
+ .subscribe();
+
+ // 6 keys → spread across the 3 segments.
+ int n = 6;
+ for (int i = 0; i < n; i++) {
+ producer.newMessage().key("k-" + i).value("v-" + i).send();
+ }
+
+ // For each source delivery (counts 0 and 1), nack everything; the
third
+ // delivery (count=2) of each message is intercepted and DLQ-forwarded.
+ for (int round = 0; round < 2; round++) {
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "round " + round + " expected delivery #" +
(i + 1));
+ consumer.negativeAcknowledge(msg.id());
+ }
+ }
+
+ Set<String> dlqValues = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ Message<byte[]> dlqMsg =
dlqConsumer.receive(Duration.ofSeconds(10));
+ assertNotNull(dlqMsg, "expected DLQ message #" + (i + 1));
+ dlqValues.add(new String(dlqMsg.value()));
+ dlqConsumer.acknowledge(dlqMsg.id());
+ }
+ for (int i = 0; i < n; i++) {
+ assertTrue(dlqValues.contains("v-" + i), "missing DLQ value v-" +
i);
+ }
}
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
index 102d6de6c52..fed80479a42 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
@@ -171,7 +171,7 @@ final class MultiTopicQueueConsumer<T> implements
QueueConsumerImpl<T> {
};
return dagWatch.start()
.thenCompose(layout -> ScalableQueueConsumer.createAsyncImpl(
- client, v5Schema, perTopicConf(topicName), dagWatch,
layout, sink))
+ client, v5Schema, perTopicConf(topicName), dagWatch,
layout, sink, null))
.thenAccept(qc -> {
if (closed) {
qc.closeAsync();
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index a268e70219b..7f9e32963be 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -48,6 +48,13 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
private String topicName;
private NamespaceName namespaceName;
private Map<String, String> propertyFilters;
+ /**
+ * V5-layer DLQ policy. Held here rather than translated into {@code conf}
so the
+ * V5 {@link ScalableQueueConsumer} can own a single DLQ producer (instead
of v4's
+ * one-per-segment design, which would also reject {@code topic://}
scalable DLQ
+ * targets).
+ */
+ private DeadLetterPolicy dlqPolicy;
QueueConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
this.client = client;
@@ -88,7 +95,7 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
return dagWatch.start()
.thenCompose(initialLayout ->
ScalableQueueConsumer.createAsync(
- client, v5Schema, conf, dagWatch, initialLayout));
+ client, v5Schema, conf, dagWatch, initialLayout,
dlqPolicy));
}
@Override
@@ -190,18 +197,10 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
@Override
public QueueConsumerBuilderV5<T> deadLetterPolicy(DeadLetterPolicy policy)
{
- var builder = org.apache.pulsar.client.api.DeadLetterPolicy.builder()
- .maxRedeliverCount(policy.maxRedeliverCount());
- if (policy.retryLetterTopic() != null) {
- builder.retryLetterTopic(policy.retryLetterTopic());
- }
- if (policy.deadLetterTopic() != null) {
- builder.deadLetterTopic(policy.deadLetterTopic());
- }
- if (policy.initialSubscriptionName() != null) {
- builder.initialSubscriptionName(policy.initialSubscriptionName());
- }
- conf.setDeadLetterPolicy(builder.build());
+ // Don't translate into conf — V5 owns DLQ at the consumer layer (see
field
+ // javadoc). The v4 per-segment DLQ would (a) duplicate producers, and
+ // (b) reject scalable DLQ topics.
+ this.dlqPolicy = policy;
return this;
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 31cb556b70e..2952318936a 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.client.impl.v5;
import io.github.merlimat.slog.Logger;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,14 +36,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.v5.Message;
import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.v5.Producer;
import org.apache.pulsar.client.api.v5.PulsarClientException;
import org.apache.pulsar.client.api.v5.QueueConsumer;
import org.apache.pulsar.client.api.v5.Transaction;
import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
+import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
+import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Backoff;
/**
@@ -80,6 +87,18 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
*/
private final java.util.function.Consumer<MessageV5<T>> messageSink;
+ /**
+ * V5-layer DLQ. Owned at the V5 consumer (not per-segment) so a single
producer
+ * can target a scalable DLQ topic — v4's per-segment DLQ producer rejects
+ * {@code topic://} (scalable) names. {@code null} when DLQ is not
configured.
+ */
+ private final DeadLetterPolicy dlqPolicy;
+ /** Resolved DLQ topic name (defaulted if the policy didn't provide one).
*/
+ private final String dlqTopic;
+ /** Lazily created on first send-to-DLQ; shared by all segments. */
+ private volatile CompletableFuture<Producer<byte[]>> dlqProducerFuture;
+ private final Object dlqProducerLock = new Object();
+
private volatile boolean closed = false;
private final AsyncQueueConsumerV5<T> asyncView;
@@ -96,7 +115,8 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
Schema<T> v5Schema,
ConsumerConfigurationData<T> consumerConf,
DagWatchClient dagWatch,
- java.util.function.Consumer<MessageV5<T>>
messageSink) {
+ java.util.function.Consumer<MessageV5<T>>
messageSink,
+ DeadLetterPolicy dlqPolicy) {
this.client = client;
this.v5Schema = v5Schema;
this.v4Schema = SchemaAdapter.toV4(v5Schema);
@@ -104,6 +124,8 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
this.dagWatch = dagWatch;
this.topicName = dagWatch.topicName().toString();
this.subscriptionName = consumerConf.getSubscriptionName();
+ this.dlqPolicy = dlqPolicy;
+ this.dlqTopic = dlqPolicy == null ? null : resolveDlqTopic(dlqPolicy);
// Default sink enqueues on the local messageQueue for
receive()/receive(timeout).
// Multi-topic mode passes a sink that forwards into the shared mux
instead — no
// per-topic pump thread needed.
@@ -112,6 +134,21 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
this.asyncView = new AsyncQueueConsumerV5<>(this);
}
+ /**
+ * Default DLQ topic name when {@code policy.deadLetterTopic()} is null:
+ * {@code topic://{tenant}/{ns}/{source-local}-DLQ}. The {@code topic://}
+ * domain makes it a scalable topic; the user is expected to pre-create it
via
+ * the admin API (the V5 client itself doesn't auto-create scalable
topics).
+ */
+ private String resolveDlqTopic(DeadLetterPolicy policy) {
+ if (policy.deadLetterTopic() != null) {
+ return policy.deadLetterTopic();
+ }
+ TopicName source = TopicName.get(topicName);
+ return "topic://" + source.getTenant() + "/" +
source.getNamespacePortion() + "/"
+ + source.getLocalName() +
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+ }
+
/**
* Create a fully initialized consumer asynchronously. The returned future
completes
* only after every initial segment has been successfully subscribed. If
any segment
@@ -122,8 +159,9 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
Schema<T>
v5Schema,
ConsumerConfigurationData<T> consumerConf,
DagWatchClient
dagWatch,
-
ClientSegmentLayout initialLayout) {
- return createAsyncImpl(client, v5Schema, consumerConf, dagWatch,
initialLayout, null)
+
ClientSegmentLayout initialLayout,
+
DeadLetterPolicy dlqPolicy) {
+ return createAsyncImpl(client, v5Schema, consumerConf, dagWatch,
initialLayout, null, dlqPolicy)
.thenApply(c -> c);
}
@@ -139,9 +177,10 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
ConsumerConfigurationData<T> consumerConf,
DagWatchClient dagWatch,
ClientSegmentLayout initialLayout,
- java.util.function.Consumer<MessageV5<T>> messageSink) {
+ java.util.function.Consumer<MessageV5<T>> messageSink,
+ DeadLetterPolicy dlqPolicy) {
ScalableQueueConsumer<T> consumer = new ScalableQueueConsumer<>(
- client, v5Schema, consumerConf, dagWatch, messageSink);
+ client, v5Schema, consumerConf, dagWatch, messageSink,
dlqPolicy);
return consumer.subscribeSegments(initialLayout)
.thenApply(__ -> {
dagWatch.setListener(consumer);
@@ -262,6 +301,13 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
.thenCompose(consumer -> consumer != null ?
consumer.closeAsync()
: CompletableFuture.completedFuture(null)));
}
+ var dlqFuture = dlqProducerFuture;
+ if (dlqFuture != null) {
+ futures.add(dlqFuture
+ .handle((p, ex) -> p)
+ .thenCompose(p -> p != null ? p.async().close()
+ : CompletableFuture.completedFuture(null)));
+ }
return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
.whenComplete((__, ___) -> segmentConsumers.clear());
}
@@ -390,7 +436,11 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
private void startReceiveLoop(org.apache.pulsar.client.api.Consumer<T>
v4Consumer, long segmentId) {
v4Consumer.receiveAsync().thenAccept(v4Msg -> {
- messageSink.accept(new MessageV5<>(v4Msg, segmentId));
+ if (shouldGoToDlq(v4Msg)) {
+ forwardToDlq(v4Msg, v4Consumer);
+ } else {
+ messageSink.accept(new MessageV5<>(v4Msg, segmentId));
+ }
if (!closed) {
startReceiveLoop(v4Consumer, segmentId);
}
@@ -422,4 +472,66 @@ final class ScalableQueueConsumer<T> implements
QueueConsumerImpl<T>, DagWatchCl
return null;
});
}
+
+ // --- DLQ ---
+
+ /**
+ * V5 DLQ handling lives at the consumer level (not per-segment) so a
single
+ * producer can target a scalable DLQ topic. v4 DLQ semantics: a message is
+ * forwarded to DLQ when {@code redeliveryCount > maxRedeliverCount}
(strictly
+ * greater) — i.e. after maxRedeliverCount+1 deliveries, the next delivery
is
+ * intercepted.
+ */
+ private boolean shouldGoToDlq(org.apache.pulsar.client.api.Message<T>
v4Msg) {
+ return dlqPolicy != null && v4Msg.getRedeliveryCount() >
dlqPolicy.maxRedeliverCount();
+ }
+
+ private void forwardToDlq(org.apache.pulsar.client.api.Message<T> v4Msg,
+ org.apache.pulsar.client.api.Consumer<T>
v4Consumer) {
+ getOrCreateDlqProducer().thenCompose(dlq -> {
+ var msgBuilder = dlq.async().newMessage()
+ .value(v4Msg.getData());
+ if (v4Msg.hasKey()) {
+ msgBuilder.key(v4Msg.getKey());
+ }
+ if (v4Msg.getEventTime() > 0) {
+
msgBuilder.eventTime(Instant.ofEpochMilli(v4Msg.getEventTime()));
+ }
+ // Preserve original properties + attach origin metadata so
consumers of the
+ // DLQ topic can correlate back to the source.
+ Map<String, String> props = new HashMap<>();
+ if (v4Msg.getProperties() != null) {
+ props.putAll(v4Msg.getProperties());
+ }
+ props.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, topicName);
+ props.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION,
subscriptionName);
+ props.put(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID,
v4Msg.getMessageId().toString());
+ msgBuilder.properties(props);
+ return msgBuilder.send();
+ }).whenComplete((msgId, ex) -> {
+ if (ex != null) {
+ // Leave the source message un-acked: it'll be redelivered and
we'll
+ // try again. (Mirrors v4 ConsumerImpl behavior.)
+ log.warn().attr("messageId",
v4Msg.getMessageId()).exception(ex)
+ .log("Failed to forward message to DLQ; will retry on
redelivery");
+ } else {
+ v4Consumer.acknowledgeAsync(v4Msg.getMessageId());
+ }
+ });
+ }
+
+ private CompletableFuture<Producer<byte[]>> getOrCreateDlqProducer() {
+ var existing = dlqProducerFuture;
+ if (existing != null) {
+ return existing;
+ }
+ synchronized (dlqProducerLock) {
+ if (dlqProducerFuture == null) {
+ dlqProducerFuture = client.newProducer(Schema.bytes())
+ .topic(dlqTopic)
+ .createAsync();
+ }
+ return dlqProducerFuture;
+ }
+ }
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index fd8ec3b5747..0fe745ff2fa 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.v5.MessageBuilder;
import org.apache.pulsar.client.api.v5.Producer;
@@ -54,9 +56,38 @@ final class ScalableTopicProducer<T> implements Producer<T>,
DagWatchClient.Layo
private final SegmentRouter router;
private final String topicName;
- // Per-segment v4 producers. Key is segmentId.
- private final ConcurrentHashMap<Long,
org.apache.pulsar.client.api.Producer<T>> segmentProducers =
- new ConcurrentHashMap<>();
+ /**
+ * Per-segment v4 producers. Stored as futures so concurrent
send-on-cold-segment
+ * calls share a single creation attempt without blocking, and so callers
running
+ * on a netty IO thread can chain on the future asynchronously instead of
forcing
+ * a blocking {@code .get()} (which would deadlock against the segment
producer's
+ * own lookup response, processed on the same IO thread).
+ */
+ private final ConcurrentHashMap<Long,
CompletableFuture<org.apache.pulsar.client.api.Producer<T>>>
+ segmentProducers = new ConcurrentHashMap<>();
+
+ /**
+ * Per-segment dispatch chain. Each async send appends a link whose sole
job
+ * is to call {@code v4Producer.sendAsync(...)} (fast, synchronous queue
insert)
+ * once the previous link completes. This serializes the v4-side dispatch
in
+ * user-call order, side-stepping JDK CompletableFuture's undefined
dependent
+ * fire-order — which would otherwise let send N enter the v4 queue before
+ * send N-1 when both are dependents of the same not-yet-ready producer
+ * future. The chain head completes when the producer is ready; subsequent
+ * links complete as soon as their {@code sendAsync} call has returned
(they
+ * do not wait for broker ack — that's the user-visible future).
+ */
+ private final ConcurrentHashMap<Long,
CompletableFuture<org.apache.pulsar.client.api.Producer<T>>>
+ dispatchChains = new ConcurrentHashMap<>();
+ private final Object dispatchLock = new Object();
+
+ /**
+ * Currently in-flight async sends. {@link #flushAsync()} snapshots and
+ * awaits these (each user-visible send future completes on broker ack —
+ * exactly the flush guarantee).
+ */
+ private final Set<CompletableFuture<MessageIdV5>> inFlightSends =
+ ConcurrentHashMap.newKeySet();
// Current active segments (volatile for visibility across threads)
private volatile List<ActiveSegment> activeSegments = List.of();
@@ -108,8 +139,11 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
// that sets initialSequenceId(N) and immediately reads
lastSequenceId() sees N.
long max = producerConf.getInitialSequenceId() == null
? -1L : producerConf.getInitialSequenceId();
- for (var producer : segmentProducers.values()) {
- max = Math.max(max, producer.getLastSequenceId());
+ for (var future : segmentProducers.values()) {
+ // Best-effort: only consult producers that have finished
initializing.
+ if (future.isDone() && !future.isCompletedExceptionally()) {
+ max = Math.max(max, future.join().getLastSequenceId());
+ }
}
return max;
}
@@ -121,23 +155,17 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
@Override
public void close() throws PulsarClientException {
- closed = true;
- dagWatch.close();
-
- List<Exception> errors = new ArrayList<>();
- for (var producer : segmentProducers.values()) {
- try {
- producer.close();
- } catch (Exception e) {
- errors.add(e);
+ try {
+ closeAsync().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Close interrupted", e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof PulsarClientException pce) {
+ throw pce;
}
- }
- segmentProducers.clear();
-
- if (!errors.isEmpty()) {
- PulsarClientException ex = new PulsarClientException("Failed to
close some segment producers");
- errors.forEach(ex::addSuppressed);
- throw ex;
+ throw new PulsarClientException(cause);
}
}
@@ -197,11 +225,16 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
java.util.List<String> replicationClusters,
org.apache.pulsar.client.api.v5.Transaction txn) {
- return sendInternalAsyncWithRetry(key, value, properties,
- eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn, 0);
+ CompletableFuture<MessageIdV5> userFuture = new CompletableFuture<>();
+ inFlightSends.add(userFuture);
+ userFuture.whenComplete((__, ___) -> inFlightSends.remove(userFuture));
+ dispatchSendAttempt(userFuture, key, value, properties, eventTime,
sequenceId,
+ deliverAfter, deliverAt, replicationClusters, txn, 0);
+ return userFuture;
}
- private CompletableFuture<MessageIdV5> sendInternalAsyncWithRetry(
+ private void dispatchSendAttempt(
+ CompletableFuture<MessageIdV5> userFuture,
String key, T value, java.util.Map<String, String> properties,
java.time.Instant eventTime, Long sequenceId,
java.time.Duration deliverAfter, java.time.Instant deliverAt,
@@ -212,43 +245,70 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
try {
segmentId = routeMessage(key);
} catch (Exception e) {
- return CompletableFuture.failedFuture(e);
+ userFuture.completeExceptionally(e);
+ return;
}
+ final long routedSegmentId = segmentId;
+
+ appendToDispatchChain(routedSegmentId, producer -> {
+ var ackFuture = buildV4Message(producer, key, value, properties,
+ eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn)
+ .sendAsync();
+ ackFuture.whenComplete((v4MsgId, ex) -> {
+ if (ex == null) {
+ userFuture.complete(new MessageIdV5(v4MsgId,
routedSegmentId));
+ return;
+ }
+ Throwable cause = ex instanceof
java.util.concurrent.CompletionException
+ ? ex.getCause() : ex;
+ boolean segmentSealed = cause
+ instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .TopicTerminatedException
+ || cause instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .AlreadyClosedException;
+ if (segmentSealed && attempt < 3) {
+ log.info().attr("segmentId", routedSegmentId)
+ .attr("attempt", attempt + 1).log("Segment sealed,
retrying");
+ segmentProducers.remove(routedSegmentId);
+ dispatchChains.remove(routedSegmentId);
+ CompletableFuture.delayedExecutor(
+ 100L * (attempt + 1),
+ java.util.concurrent.TimeUnit.MILLISECONDS)
+ .execute(() -> dispatchSendAttempt(userFuture,
key, value, properties,
+ eventTime, sequenceId, deliverAfter,
deliverAt,
+ replicationClusters, txn, attempt + 1));
+ } else {
+ userFuture.completeExceptionally(ex);
+ }
+ });
+ }, userFuture);
+ }
- org.apache.pulsar.client.api.Producer<T> producer;
- try {
- producer = getOrCreateSegmentProducer(segmentId);
- } catch (PulsarClientException e) {
- return CompletableFuture.failedFuture(e);
+ /**
+ * Append a dispatch step to the per-segment chain. The chain head is the
+ * segment-producer-creation future; subsequent links complete as soon as
+ * their {@code dispatchOp} returns (which calls v4 {@code sendAsync} — a
+ * fast queue insert), so dispatch order strictly mirrors call order.
+ * If the chain itself fails (e.g., segment producer creation failed), the
+ * user-visible future is failed too.
+ */
+ private void appendToDispatchChain(long segmentId,
+
Consumer<org.apache.pulsar.client.api.Producer<T>> dispatchOp,
+ CompletableFuture<MessageIdV5>
userFuture) {
+ synchronized (dispatchLock) {
+ var prev = dispatchChains.computeIfAbsent(segmentId,
+ id -> getOrCreateSegmentProducerAsync(id));
+ var next = prev.thenApply(producer -> {
+ dispatchOp.accept(producer);
+ return producer;
+ });
+ // If the chain link itself faults (creation failure), surface it.
+ next.exceptionally(ex -> {
+ userFuture.completeExceptionally(ex);
+ return null;
+ });
+ dispatchChains.put(segmentId, next);
}
-
- return buildV4Message(producer, key, value, properties,
- eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn)
- .sendAsync()
- .thenApply(v4MsgId -> new MessageIdV5(v4MsgId, segmentId))
- .exceptionallyCompose(ex -> {
- Throwable cause = ex instanceof
java.util.concurrent.CompletionException
- ? ex.getCause() : ex;
- boolean segmentSealed = cause
- instanceof
org.apache.pulsar.client.api.PulsarClientException
- .TopicTerminatedException
- || cause instanceof
org.apache.pulsar.client.api.PulsarClientException
- .AlreadyClosedException;
- if (segmentSealed && attempt < 3) {
- log.info().attr("segmentId", segmentId)
- .attr("attempt", attempt + 1).log("Segment
sealed, retrying");
- segmentProducers.remove(segmentId);
- return CompletableFuture.supplyAsync(() -> null,
- CompletableFuture.delayedExecutor(
- 100L * (attempt + 1),
-
java.util.concurrent.TimeUnit.MILLISECONDS))
- .thenCompose(__ -> sendInternalAsyncWithRetry(
- key, value, properties, eventTime,
sequenceId,
- deliverAfter, deliverAt,
replicationClusters,
- txn, attempt + 1));
- }
- return CompletableFuture.failedFuture(ex);
- });
}
private org.apache.pulsar.client.api.TypedMessageBuilder<T> buildV4Message(
@@ -289,14 +349,14 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
}
/**
- * Flush all segment producers asynchronously.
+ * Flush all in-flight async sends. Each user-visible send future completes
+ * on broker ack, so awaiting them is exactly the "all sends so far have
+ * landed" guarantee flush() owes the caller. Snapshotting the set means
+ * sends issued *after* this call aren't waited on (matches v4 contract).
*/
CompletableFuture<Void> flushAsync() {
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- for (var producer : segmentProducers.values()) {
- futures.add(producer.flushAsync());
- }
- return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
+ var pending = inFlightSends.toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(pending);
}
CompletableFuture<Void> closeAsync() {
@@ -304,11 +364,17 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
dagWatch.close();
List<CompletableFuture<Void>> futures = new ArrayList<>();
- for (var producer : segmentProducers.values()) {
- futures.add(producer.closeAsync());
+ for (var future : segmentProducers.values()) {
+ // If creation failed, there's nothing to close — swallow so a
single bad
+ // segment doesn't fail the overall close.
+ futures.add(future.thenCompose(p -> p.closeAsync())
+ .exceptionally(__ -> null));
}
return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
- .whenComplete((__, ___) -> segmentProducers.clear());
+ .whenComplete((__, ___) -> {
+ segmentProducers.clear();
+ dispatchChains.clear();
+ });
}
// --- Layout change handling ---
@@ -355,13 +421,16 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
if (!newSegmentIds.contains(entry.getKey())) {
log.info().attr("segmentId", entry.getKey())
.log("Closing producer for sealed segment");
- entry.getValue().closeAsync().whenComplete((__, ex) -> {
- if (ex != null) {
- log.warn().attr("segmentId", entry.getKey())
- .exceptionMessage(ex).log("Error closing
producer for segment");
- }
- });
+ entry.getValue()
+ .thenCompose(p -> p.closeAsync())
+ .whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.warn().attr("segmentId", entry.getKey())
+ .exceptionMessage(ex).log("Error
closing producer for segment");
+ }
+ });
segmentProducers.remove(entry.getKey());
+ dispatchChains.remove(entry.getKey());
}
}
@@ -410,57 +479,66 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
}
}
- private org.apache.pulsar.client.api.Producer<T>
getOrCreateSegmentProducer(long segmentId)
- throws PulsarClientException {
- var existing = segmentProducers.get(segmentId);
- if (existing != null) {
- return existing;
- }
-
- try {
- return segmentProducers.computeIfAbsent(segmentId, id -> {
- // Find the segment topic name
- String segmentTopicName = null;
- for (var seg : activeSegments) {
- if (seg.segmentId() == id) {
- segmentTopicName = seg.segmentTopicName();
- break;
- }
- }
- if (segmentTopicName == null) {
- throw new RuntimeException("Segment " + id + " not found
in active segments");
+ /**
+ * Async accessor for the per-segment v4 producer. Returns a shared future
so
+ * concurrent send-on-cold-segment callers race-free funnel through a
single
+ * creation attempt — and so callers running on a netty IO thread (e.g. the
+ * V5 DLQ dispatch off a v4 receive callback) can chain via {@code
thenCompose}
+ * instead of blocking on {@code .get()} (which would deadlock against the
+ * segment producer's own lookup response, processed on the same IO
thread).
+ */
+ private CompletableFuture<org.apache.pulsar.client.api.Producer<T>>
getOrCreateSegmentProducerAsync(
+ long segmentId) {
+ return segmentProducers.computeIfAbsent(segmentId, id -> {
+ // Find the segment topic name
+ String segmentTopicName = null;
+ for (var seg : activeSegments) {
+ if (seg.segmentId() == id) {
+ segmentTopicName = seg.segmentTopicName();
+ break;
}
+ }
+ if (segmentTopicName == null) {
+ return CompletableFuture.failedFuture(
+ new PulsarClientException("Segment " + id + " not
found in active segments"));
+ }
- try {
- PulsarClientImpl v4Client = client.v4Client();
- // Clone the user-facing producer config so per-segment
producers inherit
- // every builder knob (compression, batching, chunking,
encryption,
- // initialSequenceId, accessMode, properties, ...) and not
just the few
- // fields explicitly carried over.
- var segConf = producerConf.clone();
- segConf.setTopicName(segmentTopicName);
- if (producerConf.getProducerName() != null
- && !producerConf.getProducerName().isEmpty()) {
- segConf.setProducerName(producerConf.getProducerName()
+ "-seg-" + id);
- }
- return v4Client.createSegmentProducerAsync(segConf,
v4Schema)
- .get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- } catch (RuntimeException re) {
- // computeIfAbsent can't throw checked exceptions; unwrap a v4
PulsarClientException
- // and rethrow as the V5 type so callers see the contract they
expect (and don't
- // get a misleading bare RuntimeException for a producer-fenced /
busy segment).
- Throwable cause = re.getCause();
- while (cause instanceof java.util.concurrent.ExecutionException &&
cause.getCause() != null) {
- cause = cause.getCause();
+ PulsarClientImpl v4Client = client.v4Client();
+ // Clone the user-facing producer config so per-segment producers
inherit
+ // every builder knob (compression, batching, chunking, encryption,
+ // initialSequenceId, accessMode, properties, ...) and not just
the few
+ // fields explicitly carried over.
+ var segConf = producerConf.clone();
+ segConf.setTopicName(segmentTopicName);
+ if (producerConf.getProducerName() != null
+ && !producerConf.getProducerName().isEmpty()) {
+ segConf.setProducerName(producerConf.getProducerName() +
"-seg-" + id);
}
+ return v4Client.createSegmentProducerAsync(segConf, v4Schema);
+ });
+ }
+
+ /**
+ * Sync wrapper around {@link #getOrCreateSegmentProducerAsync}. Only safe
to
+ * call from user threads (never from a netty IO thread) since it blocks
until
+ * the segment producer is ready.
+ */
+ private org.apache.pulsar.client.api.Producer<T>
getOrCreateSegmentProducer(long segmentId)
+ throws PulsarClientException {
+ try {
+ return getOrCreateSegmentProducerAsync(segmentId).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Interrupted while creating
segment producer", e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
if (cause instanceof
org.apache.pulsar.client.api.PulsarClientException v4Exc) {
throw new PulsarClientException(v4Exc.getMessage(), v4Exc);
}
- throw re;
+ if (cause instanceof PulsarClientException v5Exc) {
+ throw v5Exc;
+ }
+ throw new PulsarClientException(cause != null ? cause : e);
}
}
}