This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cb12e2fd87cbbaece3c53cfc3fb93d79610c3e8f Author: Lei Zhiyuan <[email protected]> AuthorDate: Wed Jul 27 14:57:39 2022 +0800 [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception (#16655) (cherry picked from commit f5826203607fca99e57be7db7559f5529089d393) --- .../apache/pulsar/client/api/RetryTopicTest.java | 60 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 8 +-- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 48d20c677c1..00b48afb8bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -20,11 +20,17 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import java.lang.reflect.Field; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.util.RetryMessageUtil; +import org.reflections.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -443,4 +449,58 @@ public class RetryTopicTest extends ProducerConsumerBase { checkConsumer.close(); } + + @Test(timeOut = 30000L) + public void testRetryTopicException() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic"; + final int maxRedeliveryCount = 2; + final int sendMessages = 1; + // subscribe before publish + Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .receiverQueueSize(100) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .build()) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + producer.close(); + + // mock a retry producer exception when reconsumelater is called + MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer; + List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers(); + for (ConsumerImpl<byte[]> c : consumers) { + Set<Field> deadLetterPolicyField = + ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); + + if (deadLetterPolicyField.size() != 0) { + Field field = deadLetterPolicyField.iterator().next(); + field.setAccessible(true); + DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); + deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#"); + } + } + Message<byte[]> message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + try { + consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); + } catch (PulsarClientException.InvalidTopicNameException e) { + assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class); + } catch (Exception e) { + fail("exception should be PulsarClientException.InvalidTopicNameException"); + } + consumer.close(); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a0491f86b96..485ac233527 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -567,6 +567,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } } catch (Exception e) { log.error("Create retry letter producer exception with topic: {}", deadLetterPolicy.getRetryLetterTopic(), e); + return FutureUtil.failedFuture(e); } finally { createProducerLock.writeLock().unlock(); } @@ -630,14 +631,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle }); } } catch (Exception e) { - log.error("Send to retry letter topic exception with topic: {}, messageId: {}", retryLetterProducer.getTopic(), messageId, e); - Set<MessageId> messageIds = Collections.singleton(messageId); - unAckedMessageTracker.remove(messageId); - redeliverUnacknowledgedMessages(messageIds); + result.completeExceptionally(e); } } MessageId finalMessageId = messageId; result.exceptionally(ex -> { + log.error("Send to retry letter topic exception with topic: {}, messageId: {}", + retryLetterProducer.getTopic(), finalMessageId, ex); Set<MessageId> messageIds = Collections.singleton(finalMessageId); unAckedMessageTracker.remove(finalMessageId); redeliverUnacknowledgedMessages(messageIds);
