merlimat commented on a change in pull request #13355:
URL: https://github.com/apache/pulsar/pull/13355#discussion_r770761644



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1855,10 +1863,20 @@ private void initDeadLetterProducerIfNeeded() {
             createProducerLock.writeLock().lock();
             try {
                 if (deadLetterProducer == null) {
-                    deadLetterProducer = 
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
-                            .topic(this.deadLetterPolicy.getDeadLetterTopic())
-                            .blockIfQueueFull(false)
-                            .createAsync();
+                    // We first need to create the initial subscription for 
this DLQ topic.
+                    // Otherwise, when we do not set the retention, it may 
lead to data loss.
+                    // The default initial subscription name is the 
subscription name of the current consumer.
+                    CompletableFuture<Consumer<byte[]>> deadLetterConsumer =
+                            
client.newConsumer(Schema.AUTO_PRODUCE_BYTES(schema))

Review comment:
       If the subscription is already there, we shouldn't try to create the 
consumer, otherwise we might get a `ConsumerBusy` error.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1855,10 +1863,20 @@ private void initDeadLetterProducerIfNeeded() {
             createProducerLock.writeLock().lock();
             try {
                 if (deadLetterProducer == null) {
-                    deadLetterProducer = 
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
-                            .topic(this.deadLetterPolicy.getDeadLetterTopic())
-                            .blockIfQueueFull(false)
-                            .createAsync();
+                    // We first need to create the initial subscription for 
this DLQ topic.
+                    // Otherwise, when we do not set the retention, it may 
lead to data loss.
+                    // The default initial subscription name is the 
subscription name of the current consumer.
+                    CompletableFuture<Consumer<byte[]>> deadLetterConsumer =
+                            
client.newConsumer(Schema.AUTO_PRODUCE_BYTES(schema))

Review comment:
       `AUTO_PRODUCE_BYTES` is meant to be used when producing, but not in a 
consumer.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
##########
@@ -615,4 +615,71 @@ public void 
testDeadLetterTopicUnderPartitionedTopicWithKeyShareType() throws Ex
 
         checkConsumer.close();
     }
+
+    @Test
+    public void testDeadLetterTopicDefaultInitSubscription() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 1;
+
+        final int sendMessages = 100;
+
+        final String subscriptionName = "my-subscription";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        String deadLetterTopic = 
"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ";
+        
assertEquals(admin.topics().getStats(deadLetterTopic).getSubscriptions().get(subscriptionName).getMsgBacklog(),
+                sendMessages);
+
+
+        Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
+                .topic(deadLetterTopic)
+                .subscriptionName(subscriptionName)
+                .subscribe();
+
+        int totalInDeadLetter = 0;
+        do {
+            Message<byte[]> message = deadLetterConsumer.receive(10, 
TimeUnit.SECONDS);
+            assertNotNull(message, "Dead letter consumer can not receive 
messages.");
+            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(),
+                    new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+    }

Review comment:
       We can add more test cases for when the subscription is already there, 
with a consumer attached and possibly with a different subscription type.

##########
File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
##########
@@ -49,4 +49,15 @@
      */
     private String deadLetterTopic;
 
+    /**
+     * Name of the initial subscription name of the dead letter topic.
+     * The default value is the subscription name of the consumer.
+     */
+    private String initSubscriptionName;
+
+    /**
+     * Name of the initial subscription type of the dead letter topic.
+     * The default value is `Exclusive`.
+     */
+    private SubscriptionType initSubscriptionType;

Review comment:
       I don't think we need to expose this setting. The consumer is 
immediately closed and then whoever is consumer will just automatically reset 
the subscription type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to