This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6094879419001af2625d18555e55e267323a2b03 Author: Jiwei Guo <[email protected]> AuthorDate: Fri Feb 25 15:23:40 2022 +0800 Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14433) (cherry picked from commit 7a58aeba0b439479e1d68fa67c57e120f85687b0) --- .../pulsar/client/impl/ConsumerBuilderImpl.java | 86 +++++++++++++--------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index cbfc27d..471d4ba 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -24,9 +24,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.AccessLevel; @@ -56,6 +54,7 @@ import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.FutureUtil; @Getter(AccessLevel.PUBLIC) @@ -117,48 +116,63 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { return FutureUtil.failedFuture( new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription")); } - if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) { + CompletableFuture<Void> applyDLQConfig; + if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) { TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next()); - String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; - String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; - //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed - String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; - String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; - try { - if (client.getPartitionedTopicMetadata(oldRetryLetterTopic) - .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) { - retryLetterTopic = oldRetryLetterTopic; - } - if (client.getPartitionedTopicMetadata(oldDeadLetterTopic) - .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) { - deadLetterTopic = oldDeadLetterTopic; - } - } catch (InterruptedException | TimeoutException e) { - return FutureUtil.failedFuture(e); - } catch (ExecutionException e) { - return FutureUtil.failedFuture(e.getCause()); - } - - if(conf.getDeadLetterPolicy() == null) { - conf.setDeadLetterPolicy(DeadLetterPolicy.builder() + String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; + String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); + if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) + || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { + CompletableFuture<PartitionedTopicMetadata> retryLetterTopicMetadata = + client.getPartitionedTopicMetadata(oldRetryLetterTopic); + CompletableFuture<PartitionedTopicMetadata> deadLetterTopicMetadata = + client.getPartitionedTopicMetadata(oldDeadLetterTopic); + applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata) + .thenAccept(__ -> { + String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; + String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + if (retryLetterTopicMetadata.join().partitions > 0) { + retryLetterTopic = oldRetryLetterTopic; + } + if (deadLetterTopicMetadata.join().partitions > 0) { + deadLetterTopic = oldDeadLetterTopic; + } + if (deadLetterPolicy == null) { + conf.setDeadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES) .retryLetterTopic(retryLetterTopic) .deadLetterTopic(deadLetterTopic) .build()); + } else { + if (StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) { + conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic); + } + if (StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { + conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic); + } + } + conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic()); + }); } else { - if (StringUtils.isBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) { - conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic); - } - if (StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) { - conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic); - } + conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic()); + applyDLQConfig = CompletableFuture.completedFuture(null); } - conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic()); + } else { + applyDLQConfig = CompletableFuture.completedFuture(null); } - return interceptorList == null || interceptorList.size() == 0 ? - client.subscribeAsync(conf, schema, null) : - client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList)); + return applyDLQConfig.thenCompose(__ -> { + if (interceptorList == null || interceptorList.size() == 0) { + return client.subscribeAsync(conf, schema, null); + } else { + return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList)); + } + }); } @Override @@ -332,7 +346,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull); return this; } - + @Override public ConsumerBuilder<T> property(String key, String value) { checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
