This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ebec5cea521 [feat] PIP-468: V5 dead letter queue with scalable DLQ 
topic (#25652)
ebec5cea521 is described below

commit ebec5cea521cde25097e8b77fa1b8fb9dee63032
Author: Matteo Merli <[email protected]>
AuthorDate: Sat May 2 09:27:47 2026 -0700

    [feat] PIP-468: V5 dead letter queue with scalable DLQ topic (#25652)
---
 .../client/api/v5/V5DeadLetterPolicyTest.java      | 197 +++++++++++--
 .../client/impl/v5/MultiTopicQueueConsumer.java    |   2 +-
 .../client/impl/v5/QueueConsumerBuilderV5.java     |  25 +-
 .../client/impl/v5/ScalableQueueConsumer.java      | 124 +++++++-
 .../client/impl/v5/ScalableTopicProducer.java      | 314 +++++++++++++--------
 5 files changed, 505 insertions(+), 157 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
index be8dfd7f808..11f5cd0b462 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
@@ -20,31 +20,34 @@ package org.apache.pulsar.client.api.v5;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import java.time.Duration;
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
 import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
 import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.testng.annotations.Test;
 
 /**
  * Coverage for {@link 
QueueConsumerBuilder#deadLetterPolicy(DeadLetterPolicy)}: after
- * {@code maxRedeliverCount} negative-acks, the broker forwards the message to 
the
+ * {@code maxRedeliverCount} negative-acks, the V5 layer forwards the message 
to the
  * configured dead-letter topic.
  *
- * <p><b>Known V5 gap:</b> the DLQ topic is currently a non-scalable 
persistent topic.
- * The V5 source consumer's underlying v4 {@code ConsumerImpl} creates the DLQ 
producer
- * via {@code client.newProducer(...)}, which rejects {@code topic://} 
scalable topic
- * names. Routing the DLQ producer through V5's segment-bypass path is 
required before
- * a scalable DLQ can be used here.
+ * <p>V5 owns the DLQ at the consumer layer (not per-segment), so the DLQ 
topic is
+ * itself a scalable {@code topic://} topic and a single shared producer fans 
messages
+ * from every segment into it.
  */
 public class V5DeadLetterPolicyTest extends V5ClientBaseTest {
 
     @Test
-    public void testMessageGoesToDlqAfterMaxRedeliveries() throws Exception {
+    public void testMessageGoesToScalableDlqWhenExplicitlyConfigured() throws 
Exception {
         String topic = newScalableTopic(1);
-        // Non-scalable DLQ topic — see class-level note about the V5 gap.
-        String dlqTopic = "persistent://" + getNamespace() + "/dlq-explicit";
+        String dlqTopic = newScalableTopic(1);
 
         @Cleanup
         Producer<String> producer = v5Client.newProducer(Schema.string())
@@ -59,14 +62,10 @@ public class V5DeadLetterPolicyTest extends 
V5ClientBaseTest {
                 .deadLetterPolicy(new DeadLetterPolicy(2, null, dlqTopic, 
null))
                 .subscribe();
 
-        // Subscribe to the DLQ via the V4 client (DLQ is a regular persistent 
topic).
         @Cleanup
-        org.apache.pulsar.client.api.Consumer<String> dlqConsumer = 
pulsarClient
-                .newConsumer(org.apache.pulsar.client.api.Schema.STRING)
+        QueueConsumer<byte[]> dlqConsumer = 
v5Client.newQueueConsumer(Schema.bytes())
                 .topic(dlqTopic)
                 .subscriptionName("dlq-watcher")
-                .subscriptionInitialPosition(
-                        
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
         producer.newMessage().value("dead").send();
@@ -81,11 +80,171 @@ public class V5DeadLetterPolicyTest extends 
V5ClientBaseTest {
             consumer.negativeAcknowledge(msg.id());
         }
 
-        // Now it should appear on the DLQ topic.
-        org.apache.pulsar.client.api.Message<String> dlqMsg =
-                dlqConsumer.receive(10, java.util.concurrent.TimeUnit.SECONDS);
+        Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
         assertNotNull(dlqMsg, "message did not land on the DLQ topic");
-        assertEquals(dlqMsg.getValue(), "dead");
-        dlqConsumer.acknowledge(dlqMsg);
+        assertEquals(new String(dlqMsg.value()), "dead");
+        dlqConsumer.acknowledge(dlqMsg.id());
+    }
+
+    @Test
+    public void testMessageGoesToDefaultScalableDlqTopic() throws Exception {
+        String topic = newScalableTopic(1);
+        // When deadLetterTopic is null, V5 defaults to 
topic://<tenant>/<ns>/<source-local>-DLQ.
+        // Pre-create it so the V5 producer's layout lookup succeeds.
+        String defaultDlqTopic = topic + 
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+        admin.scalableTopics().createScalableTopic(defaultDlqTopic, 1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("default-dlq-sub")
+                .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+                        Duration.ofMillis(200), Duration.ofMillis(200)))
+                .deadLetterPolicy(DeadLetterPolicy.of(1))
+                .subscribe();
+
+        @Cleanup
+        QueueConsumer<byte[]> dlqConsumer = 
v5Client.newQueueConsumer(Schema.bytes())
+                .topic(defaultDlqTopic)
+                .subscriptionName("dlq-watcher")
+                .subscribe();
+
+        producer.newMessage().value("dead-default").send();
+
+        // maxRedeliverCount=1 → user sees deliveries with counts 0 and 1, the
+        // third delivery (count=2) is intercepted.
+        for (int i = 0; i < 2; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected delivery #" + (i + 1));
+            assertEquals(msg.value(), "dead-default");
+            consumer.negativeAcknowledge(msg.id());
+        }
+
+        Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
+        assertNotNull(dlqMsg, "message did not land on the default DLQ topic");
+        assertEquals(new String(dlqMsg.value()), "dead-default");
+        dlqConsumer.acknowledge(dlqMsg.id());
+    }
+
+    /**
+     * The V5 DLQ producer must preserve message key, user properties, and 
event time,
+     * and attach origin metadata (REAL_TOPIC / REAL_SUBSCRIPTION / 
ORIGIN_MESSAGE_ID)
+     * so DLQ consumers can correlate back to the source delivery.
+     */
+    @Test
+    public void testDlqMessagePreservesKeyPropertiesAndOriginMetadata() throws 
Exception {
+        String topic = newScalableTopic(1);
+        String dlqTopic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("dlq-meta-sub")
+                .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+                        Duration.ofMillis(200), Duration.ofMillis(200)))
+                .deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic, 
null))
+                .subscribe();
+        @Cleanup
+        QueueConsumer<byte[]> dlqConsumer = 
v5Client.newQueueConsumer(Schema.bytes())
+                .topic(dlqTopic)
+                .subscriptionName("dlq-meta-watcher")
+                .subscribe();
+
+        long eventTime = System.currentTimeMillis();
+        producer.newMessage()
+                .key("k-42")
+                .value("payload")
+                .eventTime(Instant.ofEpochMilli(eventTime))
+                .property("user-prop", "user-val")
+                .send();
+
+        for (int i = 0; i < 2; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg);
+            consumer.negativeAcknowledge(msg.id());
+        }
+
+        Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
+        assertNotNull(dlqMsg, "message did not land on the DLQ topic");
+        assertEquals(new String(dlqMsg.value()), "payload");
+        assertEquals(dlqMsg.key().orElse(null), "k-42");
+        
assertEquals(dlqMsg.eventTime().map(Instant::toEpochMilli).orElse(-1L).longValue(),
+                eventTime);
+
+        Map<String, String> props = dlqMsg.properties();
+        assertEquals(props.get("user-prop"), "user-val", "user properties must 
be preserved");
+        assertEquals(props.get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), 
topic);
+        
assertEquals(props.get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION), 
"dlq-meta-sub");
+        assertNotNull(props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID),
+                "origin message id must be attached");
+        
assertTrue(props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID).matches("\\d+:\\d+(:.+)?"),
+                "origin message id should look like a v4 message id, got: "
+                        + 
props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID));
+
+        dlqConsumer.acknowledge(dlqMsg.id());
+    }
+
+    /**
+     * Verifies a single V5-side DLQ producer handles forwarding from messages 
that
+     * arrived on different source segments — i.e., the V5 layer (not the v4
+     * per-segment {@code ConsumerImpl}) owns the DLQ producer.
+     */
+    @Test
+    public void testDlqAcrossMultipleSourceSegments() throws Exception {
+        String topic = newScalableTopic(3);
+        String dlqTopic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("dlq-multi-sub")
+                .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+                        Duration.ofMillis(200), Duration.ofMillis(200)))
+                .deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic, 
null))
+                .subscribe();
+        @Cleanup
+        QueueConsumer<byte[]> dlqConsumer = 
v5Client.newQueueConsumer(Schema.bytes())
+                .topic(dlqTopic)
+                .subscriptionName("dlq-multi-watcher")
+                .subscribe();
+
+        // 6 keys → spread across the 3 segments.
+        int n = 6;
+        for (int i = 0; i < n; i++) {
+            producer.newMessage().key("k-" + i).value("v-" + i).send();
+        }
+
+        // For each source delivery (counts 0 and 1), nack everything; the 
third
+        // delivery (count=2) of each message is intercepted and DLQ-forwarded.
+        for (int round = 0; round < 2; round++) {
+            for (int i = 0; i < n; i++) {
+                Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+                assertNotNull(msg, "round " + round + " expected delivery #" + 
(i + 1));
+                consumer.negativeAcknowledge(msg.id());
+            }
+        }
+
+        Set<String> dlqValues = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            Message<byte[]> dlqMsg = 
dlqConsumer.receive(Duration.ofSeconds(10));
+            assertNotNull(dlqMsg, "expected DLQ message #" + (i + 1));
+            dlqValues.add(new String(dlqMsg.value()));
+            dlqConsumer.acknowledge(dlqMsg.id());
+        }
+        for (int i = 0; i < n; i++) {
+            assertTrue(dlqValues.contains("v-" + i), "missing DLQ value v-" + 
i);
+        }
     }
 }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
index 102d6de6c52..fed80479a42 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
@@ -171,7 +171,7 @@ final class MultiTopicQueueConsumer<T> implements 
QueueConsumerImpl<T> {
         };
         return dagWatch.start()
                 .thenCompose(layout -> ScalableQueueConsumer.createAsyncImpl(
-                        client, v5Schema, perTopicConf(topicName), dagWatch, 
layout, sink))
+                        client, v5Schema, perTopicConf(topicName), dagWatch, 
layout, sink, null))
                 .thenAccept(qc -> {
                     if (closed) {
                         qc.closeAsync();
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index a268e70219b..7f9e32963be 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -48,6 +48,13 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
     private String topicName;
     private NamespaceName namespaceName;
     private Map<String, String> propertyFilters;
+    /**
+     * V5-layer DLQ policy. Held here rather than translated into {@code conf} 
so the
+     * V5 {@link ScalableQueueConsumer} can own a single DLQ producer (instead 
of v4's
+     * one-per-segment design, which would also reject {@code topic://} 
scalable DLQ
+     * targets).
+     */
+    private DeadLetterPolicy dlqPolicy;
 
     QueueConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
         this.client = client;
@@ -88,7 +95,7 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
         DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
         return dagWatch.start()
                 .thenCompose(initialLayout -> 
ScalableQueueConsumer.createAsync(
-                        client, v5Schema, conf, dagWatch, initialLayout));
+                        client, v5Schema, conf, dagWatch, initialLayout, 
dlqPolicy));
     }
 
     @Override
@@ -190,18 +197,10 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
 
     @Override
     public QueueConsumerBuilderV5<T> deadLetterPolicy(DeadLetterPolicy policy) 
{
-        var builder = org.apache.pulsar.client.api.DeadLetterPolicy.builder()
-                .maxRedeliverCount(policy.maxRedeliverCount());
-        if (policy.retryLetterTopic() != null) {
-            builder.retryLetterTopic(policy.retryLetterTopic());
-        }
-        if (policy.deadLetterTopic() != null) {
-            builder.deadLetterTopic(policy.deadLetterTopic());
-        }
-        if (policy.initialSubscriptionName() != null) {
-            builder.initialSubscriptionName(policy.initialSubscriptionName());
-        }
-        conf.setDeadLetterPolicy(builder.build());
+        // Don't translate into conf — V5 owns DLQ at the consumer layer (see 
field
+        // javadoc). The v4 per-segment DLQ would (a) duplicate producers, and
+        // (b) reject scalable DLQ topics.
+        this.dlqPolicy = policy;
         return this;
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 31cb556b70e..2952318936a 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.client.impl.v5;
 
 import io.github.merlimat.slog.Logger;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,14 +36,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.v5.Message;
 import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.v5.Producer;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.QueueConsumer;
 import org.apache.pulsar.client.api.v5.Transaction;
 import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
+import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
+import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Backoff;
 
 /**
@@ -80,6 +87,18 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
      */
     private final java.util.function.Consumer<MessageV5<T>> messageSink;
 
+    /**
+     * V5-layer DLQ. Owned at the V5 consumer (not per-segment) so a single 
producer
+     * can target a scalable DLQ topic — v4's per-segment DLQ producer rejects
+     * {@code topic://} (scalable) names. {@code null} when DLQ is not 
configured.
+     */
+    private final DeadLetterPolicy dlqPolicy;
+    /** Resolved DLQ topic name (defaulted if the policy didn't provide one). 
*/
+    private final String dlqTopic;
+    /** Lazily created on first send-to-DLQ; shared by all segments. */
+    private volatile CompletableFuture<Producer<byte[]>> dlqProducerFuture;
+    private final Object dlqProducerLock = new Object();
+
     private volatile boolean closed = false;
     private final AsyncQueueConsumerV5<T> asyncView;
 
@@ -96,7 +115,8 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
                                   Schema<T> v5Schema,
                                   ConsumerConfigurationData<T> consumerConf,
                                   DagWatchClient dagWatch,
-                                  java.util.function.Consumer<MessageV5<T>> 
messageSink) {
+                                  java.util.function.Consumer<MessageV5<T>> 
messageSink,
+                                  DeadLetterPolicy dlqPolicy) {
         this.client = client;
         this.v5Schema = v5Schema;
         this.v4Schema = SchemaAdapter.toV4(v5Schema);
@@ -104,6 +124,8 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
         this.dagWatch = dagWatch;
         this.topicName = dagWatch.topicName().toString();
         this.subscriptionName = consumerConf.getSubscriptionName();
+        this.dlqPolicy = dlqPolicy;
+        this.dlqTopic = dlqPolicy == null ? null : resolveDlqTopic(dlqPolicy);
         // Default sink enqueues on the local messageQueue for 
receive()/receive(timeout).
         // Multi-topic mode passes a sink that forwards into the shared mux 
instead — no
         // per-topic pump thread needed.
@@ -112,6 +134,21 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
         this.asyncView = new AsyncQueueConsumerV5<>(this);
     }
 
+    /**
+     * Default DLQ topic name when {@code policy.deadLetterTopic()} is null:
+     * {@code topic://{tenant}/{ns}/{source-local}-DLQ}. The {@code topic://}
+     * domain makes it a scalable topic; the user is expected to pre-create it 
via
+     * the admin API (the V5 client itself doesn't auto-create scalable 
topics).
+     */
+    private String resolveDlqTopic(DeadLetterPolicy policy) {
+        if (policy.deadLetterTopic() != null) {
+            return policy.deadLetterTopic();
+        }
+        TopicName source = TopicName.get(topicName);
+        return "topic://" + source.getTenant() + "/" + 
source.getNamespacePortion() + "/"
+                + source.getLocalName() + 
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+    }
+
     /**
      * Create a fully initialized consumer asynchronously. The returned future 
completes
      * only after every initial segment has been successfully subscribed. If 
any segment
@@ -122,8 +159,9 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
                                                                Schema<T> 
v5Schema,
                                                                
ConsumerConfigurationData<T> consumerConf,
                                                                DagWatchClient 
dagWatch,
-                                                               
ClientSegmentLayout initialLayout) {
-        return createAsyncImpl(client, v5Schema, consumerConf, dagWatch, 
initialLayout, null)
+                                                               
ClientSegmentLayout initialLayout,
+                                                               
DeadLetterPolicy dlqPolicy) {
+        return createAsyncImpl(client, v5Schema, consumerConf, dagWatch, 
initialLayout, null, dlqPolicy)
                 .thenApply(c -> c);
     }
 
@@ -139,9 +177,10 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
             ConsumerConfigurationData<T> consumerConf,
             DagWatchClient dagWatch,
             ClientSegmentLayout initialLayout,
-            java.util.function.Consumer<MessageV5<T>> messageSink) {
+            java.util.function.Consumer<MessageV5<T>> messageSink,
+            DeadLetterPolicy dlqPolicy) {
         ScalableQueueConsumer<T> consumer = new ScalableQueueConsumer<>(
-                client, v5Schema, consumerConf, dagWatch, messageSink);
+                client, v5Schema, consumerConf, dagWatch, messageSink, 
dlqPolicy);
         return consumer.subscribeSegments(initialLayout)
                 .thenApply(__ -> {
                     dagWatch.setListener(consumer);
@@ -262,6 +301,13 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
                     .thenCompose(consumer -> consumer != null ? 
consumer.closeAsync()
                             : CompletableFuture.completedFuture(null)));
         }
+        var dlqFuture = dlqProducerFuture;
+        if (dlqFuture != null) {
+            futures.add(dlqFuture
+                    .handle((p, ex) -> p)
+                    .thenCompose(p -> p != null ? p.async().close()
+                            : CompletableFuture.completedFuture(null)));
+        }
         return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
                 .whenComplete((__, ___) -> segmentConsumers.clear());
     }
@@ -390,7 +436,11 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
 
     private void startReceiveLoop(org.apache.pulsar.client.api.Consumer<T> 
v4Consumer, long segmentId) {
         v4Consumer.receiveAsync().thenAccept(v4Msg -> {
-            messageSink.accept(new MessageV5<>(v4Msg, segmentId));
+            if (shouldGoToDlq(v4Msg)) {
+                forwardToDlq(v4Msg, v4Consumer);
+            } else {
+                messageSink.accept(new MessageV5<>(v4Msg, segmentId));
+            }
             if (!closed) {
                 startReceiveLoop(v4Consumer, segmentId);
             }
@@ -422,4 +472,66 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumerImpl<T>, DagWatchCl
             return null;
         });
     }
+
+    // --- DLQ ---
+
+    /**
+     * V5 DLQ handling lives at the consumer level (not per-segment) so a 
single
+     * producer can target a scalable DLQ topic. v4 DLQ semantics: a message is
+     * forwarded to DLQ when {@code redeliveryCount > maxRedeliverCount} 
(strictly
+     * greater) — i.e. after maxRedeliverCount+1 deliveries, the next delivery 
is
+     * intercepted.
+     */
+    private boolean shouldGoToDlq(org.apache.pulsar.client.api.Message<T> 
v4Msg) {
+        return dlqPolicy != null && v4Msg.getRedeliveryCount() > 
dlqPolicy.maxRedeliverCount();
+    }
+
+    private void forwardToDlq(org.apache.pulsar.client.api.Message<T> v4Msg,
+                              org.apache.pulsar.client.api.Consumer<T> 
v4Consumer) {
+        getOrCreateDlqProducer().thenCompose(dlq -> {
+            var msgBuilder = dlq.async().newMessage()
+                    .value(v4Msg.getData());
+            if (v4Msg.hasKey()) {
+                msgBuilder.key(v4Msg.getKey());
+            }
+            if (v4Msg.getEventTime() > 0) {
+                
msgBuilder.eventTime(Instant.ofEpochMilli(v4Msg.getEventTime()));
+            }
+            // Preserve original properties + attach origin metadata so 
consumers of the
+            // DLQ topic can correlate back to the source.
+            Map<String, String> props = new HashMap<>();
+            if (v4Msg.getProperties() != null) {
+                props.putAll(v4Msg.getProperties());
+            }
+            props.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, topicName);
+            props.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION, 
subscriptionName);
+            props.put(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID, 
v4Msg.getMessageId().toString());
+            msgBuilder.properties(props);
+            return msgBuilder.send();
+        }).whenComplete((msgId, ex) -> {
+            if (ex != null) {
+                // Leave the source message un-acked: it'll be redelivered and 
we'll
+                // try again. (Mirrors v4 ConsumerImpl behavior.)
+                log.warn().attr("messageId", 
v4Msg.getMessageId()).exception(ex)
+                        .log("Failed to forward message to DLQ; will retry on 
redelivery");
+            } else {
+                v4Consumer.acknowledgeAsync(v4Msg.getMessageId());
+            }
+        });
+    }
+
+    private CompletableFuture<Producer<byte[]>> getOrCreateDlqProducer() {
+        var existing = dlqProducerFuture;
+        if (existing != null) {
+            return existing;
+        }
+        synchronized (dlqProducerLock) {
+            if (dlqProducerFuture == null) {
+                dlqProducerFuture = client.newProducer(Schema.bytes())
+                        .topic(dlqTopic)
+                        .createAsync();
+            }
+            return dlqProducerFuture;
+        }
+    }
 }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index fd8ec3b5747..0fe745ff2fa 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.v5.MessageBuilder;
 import org.apache.pulsar.client.api.v5.Producer;
@@ -54,9 +56,38 @@ final class ScalableTopicProducer<T> implements Producer<T>, 
DagWatchClient.Layo
     private final SegmentRouter router;
     private final String topicName;
 
-    // Per-segment v4 producers. Key is segmentId.
-    private final ConcurrentHashMap<Long, 
org.apache.pulsar.client.api.Producer<T>> segmentProducers =
-            new ConcurrentHashMap<>();
+    /**
+     * Per-segment v4 producers. Stored as futures so concurrent 
send-on-cold-segment
+     * calls share a single creation attempt without blocking, and so callers 
running
+     * on a netty IO thread can chain on the future asynchronously instead of 
forcing
+     * a blocking {@code .get()} (which would deadlock against the segment 
producer's
+     * own lookup response, processed on the same IO thread).
+     */
+    private final ConcurrentHashMap<Long, 
CompletableFuture<org.apache.pulsar.client.api.Producer<T>>>
+            segmentProducers = new ConcurrentHashMap<>();
+
+    /**
+     * Per-segment dispatch chain. Each async send appends a link whose sole 
job
+     * is to call {@code v4Producer.sendAsync(...)} (fast, synchronous queue 
insert)
+     * once the previous link completes. This serializes the v4-side dispatch 
in
+     * user-call order, side-stepping JDK CompletableFuture's undefined 
dependent
+     * fire-order — which would otherwise let send N enter the v4 queue before
+     * send N-1 when both are dependents of the same not-yet-ready producer
+     * future. The chain head completes when the producer is ready; subsequent
+     * links complete as soon as their {@code sendAsync} call has returned 
(they
+     * do not wait for broker ack — that's the user-visible future).
+     */
+    private final ConcurrentHashMap<Long, 
CompletableFuture<org.apache.pulsar.client.api.Producer<T>>>
+            dispatchChains = new ConcurrentHashMap<>();
+    private final Object dispatchLock = new Object();
+
+    /**
+     * Currently in-flight async sends. {@link #flushAsync()} snapshots and
+     * awaits these (each user-visible send future completes on broker ack —
+     * exactly the flush guarantee).
+     */
+    private final Set<CompletableFuture<MessageIdV5>> inFlightSends =
+            ConcurrentHashMap.newKeySet();
 
     // Current active segments (volatile for visibility across threads)
     private volatile List<ActiveSegment> activeSegments = List.of();
@@ -108,8 +139,11 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
         // that sets initialSequenceId(N) and immediately reads 
lastSequenceId() sees N.
         long max = producerConf.getInitialSequenceId() == null
                 ? -1L : producerConf.getInitialSequenceId();
-        for (var producer : segmentProducers.values()) {
-            max = Math.max(max, producer.getLastSequenceId());
+        for (var future : segmentProducers.values()) {
+            // Best-effort: only consult producers that have finished 
initializing.
+            if (future.isDone() && !future.isCompletedExceptionally()) {
+                max = Math.max(max, future.join().getLastSequenceId());
+            }
         }
         return max;
     }
@@ -121,23 +155,17 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
 
     @Override
     public void close() throws PulsarClientException {
-        closed = true;
-        dagWatch.close();
-
-        List<Exception> errors = new ArrayList<>();
-        for (var producer : segmentProducers.values()) {
-            try {
-                producer.close();
-            } catch (Exception e) {
-                errors.add(e);
+        try {
+            closeAsync().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Close interrupted", e);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof PulsarClientException pce) {
+                throw pce;
             }
-        }
-        segmentProducers.clear();
-
-        if (!errors.isEmpty()) {
-            PulsarClientException ex = new PulsarClientException("Failed to 
close some segment producers");
-            errors.forEach(ex::addSuppressed);
-            throw ex;
+            throw new PulsarClientException(cause);
         }
     }
 
@@ -197,11 +225,16 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             java.util.List<String> replicationClusters,
             org.apache.pulsar.client.api.v5.Transaction txn) {
 
-        return sendInternalAsyncWithRetry(key, value, properties,
-                eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn, 0);
+        CompletableFuture<MessageIdV5> userFuture = new CompletableFuture<>();
+        inFlightSends.add(userFuture);
+        userFuture.whenComplete((__, ___) -> inFlightSends.remove(userFuture));
+        dispatchSendAttempt(userFuture, key, value, properties, eventTime, 
sequenceId,
+                deliverAfter, deliverAt, replicationClusters, txn, 0);
+        return userFuture;
     }
 
-    private CompletableFuture<MessageIdV5> sendInternalAsyncWithRetry(
+    private void dispatchSendAttempt(
+            CompletableFuture<MessageIdV5> userFuture,
             String key, T value, java.util.Map<String, String> properties,
             java.time.Instant eventTime, Long sequenceId,
             java.time.Duration deliverAfter, java.time.Instant deliverAt,
@@ -212,43 +245,70 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
         try {
             segmentId = routeMessage(key);
         } catch (Exception e) {
-            return CompletableFuture.failedFuture(e);
+            userFuture.completeExceptionally(e);
+            return;
         }
+        final long routedSegmentId = segmentId;
+
+        appendToDispatchChain(routedSegmentId, producer -> {
+            var ackFuture = buildV4Message(producer, key, value, properties,
+                    eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn)
+                    .sendAsync();
+            ackFuture.whenComplete((v4MsgId, ex) -> {
+                if (ex == null) {
+                    userFuture.complete(new MessageIdV5(v4MsgId, 
routedSegmentId));
+                    return;
+                }
+                Throwable cause = ex instanceof 
java.util.concurrent.CompletionException
+                        ? ex.getCause() : ex;
+                boolean segmentSealed = cause
+                        instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                                .TopicTerminatedException
+                        || cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                                .AlreadyClosedException;
+                if (segmentSealed && attempt < 3) {
+                    log.info().attr("segmentId", routedSegmentId)
+                            .attr("attempt", attempt + 1).log("Segment sealed, 
retrying");
+                    segmentProducers.remove(routedSegmentId);
+                    dispatchChains.remove(routedSegmentId);
+                    CompletableFuture.delayedExecutor(
+                                    100L * (attempt + 1),
+                                    java.util.concurrent.TimeUnit.MILLISECONDS)
+                            .execute(() -> dispatchSendAttempt(userFuture, 
key, value, properties,
+                                    eventTime, sequenceId, deliverAfter, 
deliverAt,
+                                    replicationClusters, txn, attempt + 1));
+                } else {
+                    userFuture.completeExceptionally(ex);
+                }
+            });
+        }, userFuture);
+    }
 
-        org.apache.pulsar.client.api.Producer<T> producer;
-        try {
-            producer = getOrCreateSegmentProducer(segmentId);
-        } catch (PulsarClientException e) {
-            return CompletableFuture.failedFuture(e);
+    /**
+     * Append a dispatch step to the per-segment chain. The chain head is the
+     * segment-producer-creation future; subsequent links complete as soon as
+     * their {@code dispatchOp} returns (which calls v4 {@code sendAsync} — a
+     * fast queue insert), so dispatch order strictly mirrors call order.
+     * If the chain itself fails (e.g., segment producer creation failed), the
+     * user-visible future is failed too.
+     */
+    private void appendToDispatchChain(long segmentId,
+                                       
Consumer<org.apache.pulsar.client.api.Producer<T>> dispatchOp,
+                                       CompletableFuture<MessageIdV5> 
userFuture) {
+        synchronized (dispatchLock) {
+            var prev = dispatchChains.computeIfAbsent(segmentId,
+                    id -> getOrCreateSegmentProducerAsync(id));
+            var next = prev.thenApply(producer -> {
+                dispatchOp.accept(producer);
+                return producer;
+            });
+            // If the chain link itself faults (creation failure), surface it.
+            next.exceptionally(ex -> {
+                userFuture.completeExceptionally(ex);
+                return null;
+            });
+            dispatchChains.put(segmentId, next);
         }
-
-        return buildV4Message(producer, key, value, properties,
-                eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn)
-                .sendAsync()
-                .thenApply(v4MsgId -> new MessageIdV5(v4MsgId, segmentId))
-                .exceptionallyCompose(ex -> {
-                    Throwable cause = ex instanceof 
java.util.concurrent.CompletionException
-                            ? ex.getCause() : ex;
-                    boolean segmentSealed = cause
-                            instanceof 
org.apache.pulsar.client.api.PulsarClientException
-                                    .TopicTerminatedException
-                            || cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
-                                    .AlreadyClosedException;
-                    if (segmentSealed && attempt < 3) {
-                        log.info().attr("segmentId", segmentId)
-                                .attr("attempt", attempt + 1).log("Segment 
sealed, retrying");
-                        segmentProducers.remove(segmentId);
-                        return CompletableFuture.supplyAsync(() -> null,
-                                CompletableFuture.delayedExecutor(
-                                        100L * (attempt + 1),
-                                        
java.util.concurrent.TimeUnit.MILLISECONDS))
-                                .thenCompose(__ -> sendInternalAsyncWithRetry(
-                                        key, value, properties, eventTime, 
sequenceId,
-                                        deliverAfter, deliverAt, 
replicationClusters,
-                                        txn, attempt + 1));
-                    }
-                    return CompletableFuture.failedFuture(ex);
-                });
     }
 
     private org.apache.pulsar.client.api.TypedMessageBuilder<T> buildV4Message(
@@ -289,14 +349,14 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
     }
 
     /**
-     * Flush all segment producers asynchronously.
+     * Flush all in-flight async sends. Each user-visible send future completes
+     * on broker ack, so awaiting them is exactly the "all sends so far have
+     * landed" guarantee flush() owes the caller. Snapshotting the set means
+     * sends issued *after* this call aren't waited on (matches v4 contract).
      */
     CompletableFuture<Void> flushAsync() {
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        for (var producer : segmentProducers.values()) {
-            futures.add(producer.flushAsync());
-        }
-        return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
+        var pending = inFlightSends.toArray(CompletableFuture[]::new);
+        return CompletableFuture.allOf(pending);
     }
 
     CompletableFuture<Void> closeAsync() {
@@ -304,11 +364,17 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
         dagWatch.close();
 
         List<CompletableFuture<Void>> futures = new ArrayList<>();
-        for (var producer : segmentProducers.values()) {
-            futures.add(producer.closeAsync());
+        for (var future : segmentProducers.values()) {
+            // If creation failed, there's nothing to close — swallow so a 
single bad
+            // segment doesn't fail the overall close.
+            futures.add(future.thenCompose(p -> p.closeAsync())
+                    .exceptionally(__ -> null));
         }
         return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
-                .whenComplete((__, ___) -> segmentProducers.clear());
+                .whenComplete((__, ___) -> {
+                    segmentProducers.clear();
+                    dispatchChains.clear();
+                });
     }
 
     // --- Layout change handling ---
@@ -355,13 +421,16 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             if (!newSegmentIds.contains(entry.getKey())) {
                 log.info().attr("segmentId", entry.getKey())
                         .log("Closing producer for sealed segment");
-                entry.getValue().closeAsync().whenComplete((__, ex) -> {
-                    if (ex != null) {
-                        log.warn().attr("segmentId", entry.getKey())
-                                .exceptionMessage(ex).log("Error closing 
producer for segment");
-                    }
-                });
+                entry.getValue()
+                        .thenCompose(p -> p.closeAsync())
+                        .whenComplete((__, ex) -> {
+                            if (ex != null) {
+                                log.warn().attr("segmentId", entry.getKey())
+                                        .exceptionMessage(ex).log("Error 
closing producer for segment");
+                            }
+                        });
                 segmentProducers.remove(entry.getKey());
+                dispatchChains.remove(entry.getKey());
             }
         }
 
@@ -410,57 +479,66 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
         }
     }
 
-    private org.apache.pulsar.client.api.Producer<T> 
getOrCreateSegmentProducer(long segmentId)
-            throws PulsarClientException {
-        var existing = segmentProducers.get(segmentId);
-        if (existing != null) {
-            return existing;
-        }
-
-        try {
-            return segmentProducers.computeIfAbsent(segmentId, id -> {
-                // Find the segment topic name
-                String segmentTopicName = null;
-                for (var seg : activeSegments) {
-                    if (seg.segmentId() == id) {
-                        segmentTopicName = seg.segmentTopicName();
-                        break;
-                    }
-                }
-                if (segmentTopicName == null) {
-                    throw new RuntimeException("Segment " + id + " not found 
in active segments");
+    /**
+     * Async accessor for the per-segment v4 producer. Returns a shared future 
so
+     * concurrent send-on-cold-segment callers race-free funnel through a 
single
+     * creation attempt — and so callers running on a netty IO thread (e.g. the
+     * V5 DLQ dispatch off a v4 receive callback) can chain via {@code 
thenCompose}
+     * instead of blocking on {@code .get()} (which would deadlock against the
+     * segment producer's own lookup response, processed on the same IO 
thread).
+     */
+    private CompletableFuture<org.apache.pulsar.client.api.Producer<T>> 
getOrCreateSegmentProducerAsync(
+            long segmentId) {
+        return segmentProducers.computeIfAbsent(segmentId, id -> {
+            // Find the segment topic name
+            String segmentTopicName = null;
+            for (var seg : activeSegments) {
+                if (seg.segmentId() == id) {
+                    segmentTopicName = seg.segmentTopicName();
+                    break;
                 }
+            }
+            if (segmentTopicName == null) {
+                return CompletableFuture.failedFuture(
+                        new PulsarClientException("Segment " + id + " not 
found in active segments"));
+            }
 
-                try {
-                    PulsarClientImpl v4Client = client.v4Client();
-                    // Clone the user-facing producer config so per-segment 
producers inherit
-                    // every builder knob (compression, batching, chunking, 
encryption,
-                    // initialSequenceId, accessMode, properties, ...) and not 
just the few
-                    // fields explicitly carried over.
-                    var segConf = producerConf.clone();
-                    segConf.setTopicName(segmentTopicName);
-                    if (producerConf.getProducerName() != null
-                            && !producerConf.getProducerName().isEmpty()) {
-                        segConf.setProducerName(producerConf.getProducerName() 
+ "-seg-" + id);
-                    }
-                    return v4Client.createSegmentProducerAsync(segConf, 
v4Schema)
-                            .get();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            });
-        } catch (RuntimeException re) {
-            // computeIfAbsent can't throw checked exceptions; unwrap a v4 
PulsarClientException
-            // and rethrow as the V5 type so callers see the contract they 
expect (and don't
-            // get a misleading bare RuntimeException for a producer-fenced / 
busy segment).
-            Throwable cause = re.getCause();
-            while (cause instanceof java.util.concurrent.ExecutionException && 
cause.getCause() != null) {
-                cause = cause.getCause();
+            PulsarClientImpl v4Client = client.v4Client();
+            // Clone the user-facing producer config so per-segment producers 
inherit
+            // every builder knob (compression, batching, chunking, encryption,
+            // initialSequenceId, accessMode, properties, ...) and not just 
the few
+            // fields explicitly carried over.
+            var segConf = producerConf.clone();
+            segConf.setTopicName(segmentTopicName);
+            if (producerConf.getProducerName() != null
+                    && !producerConf.getProducerName().isEmpty()) {
+                segConf.setProducerName(producerConf.getProducerName() + 
"-seg-" + id);
             }
+            return v4Client.createSegmentProducerAsync(segConf, v4Schema);
+        });
+    }
+
+    /**
+     * Sync wrapper around {@link #getOrCreateSegmentProducerAsync}. Only safe 
to
+     * call from user threads (never from a netty IO thread) since it blocks 
until
+     * the segment producer is ready.
+     */
+    private org.apache.pulsar.client.api.Producer<T> 
getOrCreateSegmentProducer(long segmentId)
+            throws PulsarClientException {
+        try {
+            return getOrCreateSegmentProducerAsync(segmentId).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Interrupted while creating 
segment producer", e);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
             if (cause instanceof 
org.apache.pulsar.client.api.PulsarClientException v4Exc) {
                 throw new PulsarClientException(v4Exc.getMessage(), v4Exc);
             }
-            throw re;
+            if (cause instanceof PulsarClientException v5Exc) {
+                throw v5Exc;
+            }
+            throw new PulsarClientException(cause != null ? cause : e);
         }
     }
 }


Reply via email to