This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 83d6da90a0f [improve][broker] reduce code duplication to avoid endless 
wait ``CompletableFuture``  (#14853)
83d6da90a0f is described below

commit 83d6da90a0ffd14c1f1024941c68ce5b0c6ab4ac
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Mar 28 20:46:11 2022 +0800

    [improve][broker] reduce code duplication to avoid endless wait 
``CompletableFuture``  (#14853)
    
    (cherry picked from commit 1631bf1a7cdbcb476f06264bcbfa2f9cd56e7f8c)
---
 .../pulsar/broker/web/PulsarWebResource.java       | 125 ++++-----------------
 1 file changed, 23 insertions(+), 102 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index cbae50f8739..39211aca7ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -33,14 +33,13 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -237,16 +236,7 @@ public abstract class PulsarWebResource {
      *             if not authorized
      */
     public void validateSuperUserAccess() {
-        try {
-            
validateSuperUserAccessAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),
 SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-            Throwable realCause = FutureUtil.unwrapCompletionException(e);
-            if (realCause instanceof WebApplicationException){
-                throw (WebApplicationException) realCause;
-            } else {
-                throw new RestException(realCause);
-            }
-        }
+        sync(this::validateSuperUserAccessAsync);
     }
 
     /**
@@ -452,16 +442,7 @@ public abstract class PulsarWebResource {
      * @throws Exception In case the redirect happens
      */
     protected void validateClusterOwnership(String cluster) throws 
WebApplicationException {
-        try {
-            
validateClusterOwnershipAsync(cluster).get(config().getMetadataStoreOperationTimeoutSeconds(),
 SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException 
ex) {
-            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
-            if (realCause instanceof WebApplicationException){
-                throw (WebApplicationException) realCause;
-            } else {
-                throw new RestException(realCause);
-            }
-        }
+        sync(()-> validateClusterOwnershipAsync(cluster));
     }
 
     private URI getRedirectionUrl(ClusterData differentClusterData) throws 
MalformedURLException {
@@ -669,15 +650,7 @@ public abstract class PulsarWebResource {
      * @param authoritative
      */
     protected void validateTopicOwnership(TopicName topicName, boolean 
authoritative) {
-        try {
-            validateTopicOwnershipAsync(topicName, authoritative).join();
-        } catch (CompletionException ce) {
-            if (ce.getCause() instanceof WebApplicationException) {
-                throw (WebApplicationException) ce.getCause();
-            } else {
-                throw new RestException(ce.getCause());
-            }
-        }
+        sync(()-> validateTopicOwnershipAsync(topicName, authoritative));
     }
 
     protected CompletableFuture<Void> validateTopicOwnershipAsync(TopicName 
topicName, boolean authoritative) {
@@ -936,19 +909,7 @@ public abstract class PulsarWebResource {
     }
 
     public void validateTenantOperation(String tenant, TenantOperation 
operation) {
-        try {
-            int timeout = 
pulsar().getConfiguration().getMetadataStoreOperationTimeoutSeconds();
-            validateTenantOperationAsync(tenant, operation).get(timeout, 
SECONDS);
-        } catch (InterruptedException | TimeoutException e) {
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-            if (cause instanceof WebApplicationException){
-                throw (WebApplicationException) cause;
-            } else {
-                throw new RestException(cause);
-            }
-        }
+        sync(()-> validateTenantOperationAsync(tenant, operation));
     }
 
     public CompletableFuture<Void> validateTenantOperationAsync(String tenant, 
TenantOperation operation) {
@@ -975,19 +936,7 @@ public abstract class PulsarWebResource {
     }
 
     public void validateNamespaceOperation(NamespaceName namespaceName, 
NamespaceOperation operation) {
-        try {
-            int timeout = 
pulsar().getConfiguration().getMetadataStoreOperationTimeoutSeconds();
-            validateNamespaceOperationAsync(namespaceName, 
operation).get(timeout, SECONDS);
-        } catch (InterruptedException | TimeoutException e) {
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-            if (cause instanceof WebApplicationException){
-                throw (WebApplicationException) cause;
-            } else {
-                throw new RestException(cause);
-            }
-        }
+        sync(()-> validateNamespaceOperationAsync(namespaceName, operation));
     }
 
 
@@ -1014,22 +963,9 @@ public abstract class PulsarWebResource {
         return CompletableFuture.completedFuture(null);
     }
 
-    public void validateNamespacePolicyOperation(NamespaceName namespaceName,
-                                                 PolicyName policy,
+    public void validateNamespacePolicyOperation(NamespaceName namespaceName, 
PolicyName policy,
                                                  PolicyOperation operation) {
-        try {
-            int timeout = 
pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds();
-            validateNamespacePolicyOperationAsync(namespaceName, policy, 
operation).get(timeout, SECONDS);
-        } catch (InterruptedException | TimeoutException e) {
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-            if (cause instanceof WebApplicationException){
-                throw (WebApplicationException) cause;
-            } else {
-                throw new RestException(cause);
-            }
-        }
+        sync(()-> validateNamespacePolicyOperationAsync(namespaceName, policy, 
operation));
     }
 
     public CompletableFuture<Void> 
validateNamespacePolicyOperationAsync(NamespaceName namespaceName,
@@ -1177,19 +1113,7 @@ public abstract class PulsarWebResource {
     }
 
     public void validateTopicPolicyOperation(TopicName topicName, PolicyName 
policy, PolicyOperation operation) {
-        try {
-            int timeout = 
pulsar().getConfiguration().getMetadataStoreOperationTimeoutSeconds();
-            validateTopicPolicyOperationAsync(topicName, policy, 
operation).get(timeout, SECONDS);
-        } catch (InterruptedException | TimeoutException e) {
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-            if (cause instanceof WebApplicationException){
-                throw (WebApplicationException) cause;
-            } else {
-                throw new RestException(cause);
-            }
-        }
+        sync(()-> validateTopicPolicyOperationAsync(topicName, policy, 
operation));
     }
 
     public CompletableFuture<Void> validateTopicPolicyOperationAsync(TopicName 
topicName,
@@ -1219,16 +1143,7 @@ public abstract class PulsarWebResource {
     }
 
     public void validateTopicOperation(TopicName topicName, TopicOperation 
operation, String subscription) {
-        try {
-            validateTopicOperationAsync(topicName, operation, 
subscription).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Throwable cause = FutureUtil.unwrapCompletionException(e);
-            if (cause instanceof WebApplicationException){
-                throw (WebApplicationException) cause;
-            } else {
-                throw new RestException(cause);
-            }
-        }
+        sync(()-> validateTopicOperationAsync(topicName, operation, 
subscription));
     }
 
     public CompletableFuture<Void> validateTopicOperationAsync(TopicName 
topicName, TopicOperation operation) {
@@ -1259,13 +1174,19 @@ public abstract class PulsarWebResource {
         }
     }
 
-    protected Void handleCommonRestAsyncException(AsyncResponse asyncResponse, 
Throwable ex) {
-        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
-        if (realCause instanceof WebApplicationException) {
-            asyncResponse.resume(realCause);
-        } else {
-            asyncResponse.resume(new RestException(realCause));
+    public <T> T sync(Supplier<CompletableFuture<T>> supplier) {
+        try {
+            return 
supplier.get().get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+        } catch (ExecutionException | TimeoutException ex) {
+            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+            if (realCause instanceof WebApplicationException) {
+                throw (WebApplicationException) realCause;
+            } else {
+                throw new RestException(realCause);
+            }
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            throw new RestException(ex);
         }
-        return null;
     }
 }

Reply via email to