This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fbfc6bdbad6e65e840e4ae41b2d3f23d1b21c873
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Wed Jul 27 14:57:39 2022 +0800

    [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer 
exception (#16655)
    
    (cherry picked from commit f5826203607fca99e57be7db7559f5529089d393)
---
 .../apache/pulsar/client/api/RetryTopicTest.java   | 60 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 ++--
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 48d20c677c1..00b48afb8bd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -20,11 +20,17 @@ package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.reflections.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -443,4 +449,58 @@ public class RetryTopicTest extends ProducerConsumerBase {
         checkConsumer.close();
     }
 
+
+    @Test(timeOut = 30000L)
+    public void testRetryTopicException() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 1;
+        // subscribe before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .build())
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
+        // mock a retry producer exception when reconsumelater is called
+        MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = 
(MultiTopicsConsumerImpl<byte[]>) consumer;
+        List<ConsumerImpl<byte[]>> consumers = 
multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl<byte[]> c : consumers) {
+            Set<Field> deadLetterPolicyField =
+                    ReflectionUtils.getAllFields(c.getClass(), 
ReflectionUtils.withName("deadLetterPolicy"));
+
+            if (deadLetterPolicyField.size() != 0) {
+                Field field = deadLetterPolicyField.iterator().next();
+                field.setAccessible(true);
+                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) 
field.get(c);
+                
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+            }
+        }
+        Message<byte[]> message = consumer.receive();
+        log.info("consumer received message : {} {}", message.getMessageId(), 
new String(message.getData()));
+        try {
+            consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
+        } catch (PulsarClientException.InvalidTopicNameException e) {
+            assertEquals(e.getClass(), 
PulsarClientException.InvalidTopicNameException.class);
+        } catch (Exception e) {
+            fail("exception should be 
PulsarClientException.InvalidTopicNameException");
+        }
+        consumer.close();
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 46a6264ae73..ef06a2e71aa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -574,7 +574,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             .create();
                 }
             } catch (Exception e) {
-                log.error("Create retry letter producer exception with topic: 
{}", deadLetterPolicy.getRetryLetterTopic(), e);
+                log.error("Create retry letter producer exception with topic: 
{}",
+                        deadLetterPolicy.getRetryLetterTopic(), e);
+                return FutureUtil.failedFuture(e);
             } finally {
                 createProducerLock.writeLock().unlock();
             }
@@ -638,14 +640,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             });
                 }
             } catch (Exception e) {
-                log.error("Send to retry letter topic exception with topic: 
{}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);
-                Set<MessageId> messageIds = Collections.singleton(messageId);
-                unAckedMessageTracker.remove(messageId);
-                redeliverUnacknowledgedMessages(messageIds);
+                result.completeExceptionally(e);
             }
         }
         MessageId finalMessageId = messageId;
         result.exceptionally(ex -> {
+            log.error("Send to retry letter topic exception with topic: {}, 
messageId: {}",
+                    retryLetterProducer.getTopic(), finalMessageId, ex);
             Set<MessageId> messageIds = Collections.singleton(finalMessageId);
             unAckedMessageTracker.remove(finalMessageId);
             redeliverUnacknowledgedMessages(messageIds);

Reply via email to