This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 969b73b6607 [fix][broker] Branch-2.10 Avoid endless blocking call. 
(#18914)
969b73b6607 is described below

commit 969b73b6607128288daa40547d6fc3b78f8cee9c
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Dec 20 10:14:02 2022 +0800

    [fix][broker] Branch-2.10 Avoid endless blocking call. (#18914)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 73 +++++-----------------
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  2 +-
 .../pulsar/broker/admin/AdminResourceTest.java     |  4 +-
 3 files changed, 19 insertions(+), 60 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index aa873f6433d..305266ce799 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -26,9 +26,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
@@ -130,11 +127,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
      */
 
     public void validatePoliciesReadOnlyAccess() {
-        try {
-            validatePoliciesReadOnlyAccessAsync().join();
-        } catch (CompletionException ce) {
-            throw new RestException(ce.getCause());
-        }
+        sync(this::validatePoliciesReadOnlyAccessAsync);
     }
 
     public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {
@@ -251,17 +244,11 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
-    protected void validatePartitionedTopicMetadata(String tenant, String 
namespace, String encodedTopic) {
-        try {
-            PartitionedTopicMetadata partitionedTopicMetadata =
-                    
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
-            if (partitionedTopicMetadata.partitions < 1) {
-                throw new RestException(Status.CONFLICT, "Topic is not 
partitioned topic");
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Failed to validate partitioned topic metadata 
{}://{}/{}/{}",
-                    domain(), tenant, namespace, topicName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic 
partition meta failed.");
+    protected void validatePartitionedTopicMetadata() {
+        PartitionedTopicMetadata partitionedTopicMetadata = sync(()->
+                
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName));
+        if (partitionedTopicMetadata.partitions < 1) {
+            throw new RestException(Status.CONFLICT, "Topic is not partitioned 
topic");
         }
     }
 
@@ -465,28 +452,14 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 });
     }
 
-    protected static PartitionedTopicMetadata 
fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
-        try {
-            return 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
-        } catch (Exception e) {
-            if (e.getCause() instanceof RestException) {
-                throw (RestException) e.getCause();
-            }
-            throw new RestException(e);
-        }
+    protected PartitionedTopicMetadata 
fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
+        return sync(() -> 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName));
     }
 
-    protected static PartitionedTopicMetadata 
fetchPartitionedTopicMetadataCheckAllowAutoCreation(
+    protected PartitionedTopicMetadata 
fetchPartitionedTopicMetadataCheckAllowAutoCreation(
             PulsarService pulsar, TopicName topicName) {
-        try {
-            return 
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)
-                    .get();
-        } catch (Exception e) {
-            if (e.getCause() instanceof RestException) {
-                throw (RestException) e.getCause();
-            }
-            throw new RestException(e);
-        }
+        return sync(() -> pulsar.getBrokerService()
+                
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName));
     }
 
    protected void validateClusterExists(String cluster) {
@@ -523,26 +496,12 @@ public abstract class AdminResource extends 
PulsarWebResource {
     }
 
     protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
-        try {
-            return namespaceResources().getPartitionedTopicResources()
-                    .listPartitionedTopicsAsync(namespaceName, topicDomain)
-                    .join();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get partitioned topic list for namespace 
{}", clientAppId(),
-                    namespaceName.toString(), e);
-            throw new RestException(e);
-        }
+        return sync(() -> namespaceResources().getPartitionedTopicResources()
+                .listPartitionedTopicsAsync(namespaceName, topicDomain));
     }
 
-    protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
-        try {
-            return 
getPulsarResources().getTopicResources().getExistingPartitions(topicName)
-                    .get(config().getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
-        } catch (Exception e) {
-            log.error("[{}] Failed to get topic partition list for namespace 
{}", clientAppId(),
-                    namespaceName.toString(), e);
-            throw new RestException(e);
-        }
+    protected List<String> getTopicPartitionList() {
+        return sync(()-> 
getPulsarResources().getTopicResources().getExistingPartitions(topicName));
     }
 
     protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, 
int numPartitions,
@@ -572,7 +531,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
             // new create check
             if (maxTopicsPerNamespace > 0 && 
!pulsar().getBrokerService().isSystemTopic(topicName)) {
-                List<String> partitionedTopics = 
getTopicPartitionList(TopicDomain.persistent);
+                List<String> partitionedTopics = getTopicPartitionList();
                 // exclude created system topic
                 long topicsCount =
                         partitionedTopics.stream().filter(t ->
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a607db8b6aa..afc5575abc2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -810,7 +810,7 @@ public class PersistentTopics extends PersistentTopicsBase {
                     required = true, type = "int", defaultValue = "0")
                     int numPartitions) {
         validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
+        validatePartitionedTopicMetadata();
         internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, 
authoritative, force);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
index 2e468f960b1..0217ef1b4b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
@@ -135,11 +135,11 @@ public class AdminResourceTest extends BrokerTestBase {
         resource.setPulsar(pulsar);
         // validate should pass when topic is partitioned topic
         resource.validatePartitionedTopicName(tenant, namespace, 
Codec.encode(partitionedTopic));
-        resource.validatePartitionedTopicMetadata(tenant, namespace, 
Codec.encode(partitionedTopic));
+        resource.validatePartitionedTopicMetadata();
         // validate should failed when topic is non-partitioned topic
         resource.validatePartitionedTopicName(tenant, namespace, 
Codec.encode(nonPartitionedTopic));
         try {
-            resource.validatePartitionedTopicMetadata(tenant, namespace, 
Codec.encode(nonPartitionedTopic));
+            resource.validatePartitionedTopicMetadata();
             fail("Should fail validation on non-partitioned topic");
         } catch (RestException re) {
             assertEquals(Status.CONFLICT.getStatusCode(), 
re.getResponse().getStatus());

Reply via email to