This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6094879419001af2625d18555e55e267323a2b03
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Feb 25 15:23:40 2022 +0800

    Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14433)
    
    (cherry picked from commit 7a58aeba0b439479e1d68fa67c57e120f85687b0)
---
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 86 +++++++++++++---------
 1 file changed, 50 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index cbfc27d..471d4ba 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -24,9 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
@@ -56,6 +54,7 @@ import 
org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Getter(AccessLevel.PUBLIC)
@@ -117,48 +116,63 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
             return FutureUtil.failedFuture(
                     new InvalidConfigurationException("KeySharedPolicy must 
set with KeyShared subscription"));
         }
-        if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
+        CompletableFuture<Void> applyDLQConfig;
+        if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
             TopicName topicFirst = 
TopicName.get(conf.getTopicNames().iterator().next());
-            String retryLetterTopic = topicFirst + "-" + 
conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
-            String deadLetterTopic = topicFirst + "-" + 
conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-
             //Issue 9327: do compatibility check in case of the default retry 
and dead letter topic name changed
-            String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + 
conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
-            String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + 
conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            try {
-                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
-                        .get(client.conf.getOperationTimeoutMs(), 
TimeUnit.MILLISECONDS).partitions > 0) {
-                    retryLetterTopic = oldRetryLetterTopic;
-                }
-                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
-                        .get(client.conf.getOperationTimeoutMs(), 
TimeUnit.MILLISECONDS).partitions > 0) {
-                    deadLetterTopic = oldDeadLetterTopic;
-                }
-            } catch (InterruptedException | TimeoutException e) {
-                return FutureUtil.failedFuture(e);
-            } catch (ExecutionException e) {
-                return FutureUtil.failedFuture(e.getCause());
-            }
-
-            if(conf.getDeadLetterPolicy() == null) {
-                conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
+            String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + 
conf.getSubscriptionName()
+                    + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+            String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + 
conf.getSubscriptionName()
+                    + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+            DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
+            if (deadLetterPolicy == null || 
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
+                    || 
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                CompletableFuture<PartitionedTopicMetadata> 
retryLetterTopicMetadata =
+                        
client.getPartitionedTopicMetadata(oldRetryLetterTopic);
+                CompletableFuture<PartitionedTopicMetadata> 
deadLetterTopicMetadata =
+                        client.getPartitionedTopicMetadata(oldDeadLetterTopic);
+                applyDLQConfig = 
CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
+                        .thenAccept(__ -> {
+                            String retryLetterTopic = topicFirst + "-" + 
conf.getSubscriptionName()
+                                    + 
RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+                            String deadLetterTopic = topicFirst + "-" + 
conf.getSubscriptionName()
+                                    + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+                            if (retryLetterTopicMetadata.join().partitions > 
0) {
+                                retryLetterTopic = oldRetryLetterTopic;
+                            }
+                            if (deadLetterTopicMetadata.join().partitions > 0) 
{
+                                deadLetterTopic = oldDeadLetterTopic;
+                            }
+                            if (deadLetterPolicy == null) {
+                                
conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
                                         
.maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
                                         .retryLetterTopic(retryLetterTopic)
                                         .deadLetterTopic(deadLetterTopic)
                                         .build());
+                            } else {
+                                if 
(StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
+                                    
conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
+                                }
+                                if 
(StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                                    
conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
+                                }
+                            }
+                            
conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                        });
             } else {
-                if 
(StringUtils.isBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
-                    
conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
-                }
-                if 
(StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
-                    
conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
-                }
+                
conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                applyDLQConfig = CompletableFuture.completedFuture(null);
             }
-            
conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+        } else {
+            applyDLQConfig = CompletableFuture.completedFuture(null);
         }
-        return interceptorList == null || interceptorList.size() == 0 ?
-                client.subscribeAsync(conf, schema, null) :
-                client.subscribeAsync(conf, schema, new 
ConsumerInterceptors<>(interceptorList));
+        return applyDLQConfig.thenCompose(__ -> {
+            if (interceptorList == null || interceptorList.size() == 0) {
+                return client.subscribeAsync(conf, schema, null);
+            } else {
+                return client.subscribeAsync(conf, schema, new 
ConsumerInterceptors<>(interceptorList));
+            }
+        });
     }
 
     @Override
@@ -332,7 +346,7 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         
conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
         return this;
     }
-    
+
     @Override
     public ConsumerBuilder<T> property(String key, String value) {
         checkArgument(StringUtils.isNotBlank(key) && 
StringUtils.isNotBlank(value),

Reply via email to