This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f5826203607 [fix][client] Fix ReconsumeLater will hang up if
retryLetterProducer exception (#16655)
f5826203607 is described below
commit f5826203607fca99e57be7db7559f5529089d393
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Wed Jul 27 14:57:39 2022 +0800
[fix][client] Fix ReconsumeLater will hang up if retryLetterProducer
exception (#16655)
---
.../apache/pulsar/client/api/RetryTopicTest.java | 61 +++++++++++++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 9 ++--
2 files changed, 64 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index cca5b770c31..d8a97b7bdb1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -20,14 +20,19 @@ package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
-
+import java.lang.reflect.Field;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.reflections.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -457,4 +462,58 @@ public class RetryTopicTest extends ProducerConsumerBase {
checkConsumer.close();
}
+
+ @Test(timeOut = 30000L)
+ public void testRetryTopicException() throws Exception {
+ final String topic = "persistent://my-property/my-ns/retry-topic";
+ final int maxRedeliveryCount = 2;
+ final int sendMessages = 1;
+ // subscribe before publish
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .enableRetry(true)
+ .receiverQueueSize(100)
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(maxRedeliveryCount)
+
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+ .build())
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+ for (int i = 0; i < sendMessages; i++) {
+ producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+ }
+ producer.close();
+
+ // mock a retry producer exception when reconsumelater is called
+ MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer =
(MultiTopicsConsumerImpl<byte[]>) consumer;
+ List<ConsumerImpl<byte[]>> consumers =
multiTopicsConsumer.getConsumers();
+ for (ConsumerImpl<byte[]> c : consumers) {
+ Set<Field> deadLetterPolicyField =
+ ReflectionUtils.getAllFields(c.getClass(),
ReflectionUtils.withName("deadLetterPolicy"));
+
+ if (deadLetterPolicyField.size() != 0) {
+ Field field = deadLetterPolicyField.iterator().next();
+ field.setAccessible(true);
+ DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy)
field.get(c);
+
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+ }
+ }
+ Message<byte[]> message = consumer.receive();
+ log.info("consumer received message : {} {}", message.getMessageId(),
new String(message.getData()));
+ try {
+ consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
+ } catch (PulsarClientException.InvalidTopicNameException e) {
+ assertEquals(e.getClass(),
PulsarClientException.InvalidTopicNameException.class);
+ } catch (Exception e) {
+ fail("exception should be
PulsarClientException.InvalidTopicNameException");
+ }
+ consumer.close();
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 16eb49b1af7..2955065f313 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -655,6 +655,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} catch (Exception e) {
log.error("Create retry letter producer exception with topic:
{}",
deadLetterPolicy.getRetryLetterTopic(), e);
+ return FutureUtil.failedFuture(e);
} finally {
createProducerLock.writeLock().unlock();
}
@@ -724,15 +725,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
});
}
} catch (Exception e) {
- log.error("Send to retry letter topic exception with topic:
{}, messageId: {}",
- retryLetterProducer.getTopic(), messageId, e);
- Set<MessageId> messageIds = Collections.singleton(messageId);
- unAckedMessageTracker.remove(messageId);
- redeliverUnacknowledgedMessages(messageIds);
+ result.completeExceptionally(e);
}
}
MessageId finalMessageId = messageId;
result.exceptionally(ex -> {
+ log.error("Send to retry letter topic exception with topic: {},
messageId: {}",
+ retryLetterProducer.getTopic(), finalMessageId, ex);
Set<MessageId> messageIds = Collections.singleton(finalMessageId);
unAckedMessageTracker.remove(finalMessageId);
redeliverUnacknowledgedMessages(messageIds);