This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 83d6da90a0f [improve][broker] reduce code duplication to avoid endless
wait ``CompletableFuture`` (#14853)
83d6da90a0f is described below
commit 83d6da90a0ffd14c1f1024941c68ce5b0c6ab4ac
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Mar 28 20:46:11 2022 +0800
[improve][broker] reduce code duplication to avoid endless wait
``CompletableFuture`` (#14853)
(cherry picked from commit 1631bf1a7cdbcb476f06264bcbfa2f9cd56e7f8c)
---
.../pulsar/broker/web/PulsarWebResource.java | 125 ++++-----------------
1 file changed, 23 insertions(+), 102 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index cbae50f8739..39211aca7ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -33,14 +33,13 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@@ -237,16 +236,7 @@ public abstract class PulsarWebResource {
* if not authorized
*/
public void validateSuperUserAccess() {
- try {
-
validateSuperUserAccessAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException
e) {
- Throwable realCause = FutureUtil.unwrapCompletionException(e);
- if (realCause instanceof WebApplicationException){
- throw (WebApplicationException) realCause;
- } else {
- throw new RestException(realCause);
- }
- }
+ sync(this::validateSuperUserAccessAsync);
}
/**
@@ -452,16 +442,7 @@ public abstract class PulsarWebResource {
* @throws Exception In case the redirect happens
*/
protected void validateClusterOwnership(String cluster) throws
WebApplicationException {
- try {
-
validateClusterOwnershipAsync(cluster).get(config().getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException
ex) {
- Throwable realCause = FutureUtil.unwrapCompletionException(ex);
- if (realCause instanceof WebApplicationException){
- throw (WebApplicationException) realCause;
- } else {
- throw new RestException(realCause);
- }
- }
+ sync(()-> validateClusterOwnershipAsync(cluster));
}
private URI getRedirectionUrl(ClusterData differentClusterData) throws
MalformedURLException {
@@ -669,15 +650,7 @@ public abstract class PulsarWebResource {
* @param authoritative
*/
protected void validateTopicOwnership(TopicName topicName, boolean
authoritative) {
- try {
- validateTopicOwnershipAsync(topicName, authoritative).join();
- } catch (CompletionException ce) {
- if (ce.getCause() instanceof WebApplicationException) {
- throw (WebApplicationException) ce.getCause();
- } else {
- throw new RestException(ce.getCause());
- }
- }
+ sync(()-> validateTopicOwnershipAsync(topicName, authoritative));
}
protected CompletableFuture<Void> validateTopicOwnershipAsync(TopicName
topicName, boolean authoritative) {
@@ -936,19 +909,7 @@ public abstract class PulsarWebResource {
}
public void validateTenantOperation(String tenant, TenantOperation
operation) {
- try {
- int timeout =
pulsar().getConfiguration().getMetadataStoreOperationTimeoutSeconds();
- validateTenantOperationAsync(tenant, operation).get(timeout,
SECONDS);
- } catch (InterruptedException | TimeoutException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof WebApplicationException){
- throw (WebApplicationException) cause;
- } else {
- throw new RestException(cause);
- }
- }
+ sync(()-> validateTenantOperationAsync(tenant, operation));
}
public CompletableFuture<Void> validateTenantOperationAsync(String tenant,
TenantOperation operation) {
@@ -975,19 +936,7 @@ public abstract class PulsarWebResource {
}
public void validateNamespaceOperation(NamespaceName namespaceName,
NamespaceOperation operation) {
- try {
- int timeout =
pulsar().getConfiguration().getMetadataStoreOperationTimeoutSeconds();
- validateNamespaceOperationAsync(namespaceName,
operation).get(timeout, SECONDS);
- } catch (InterruptedException | TimeoutException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof WebApplicationException){
- throw (WebApplicationException) cause;
- } else {
- throw new RestException(cause);
- }
- }
+ sync(()-> validateNamespaceOperationAsync(namespaceName, operation));
}
@@ -1014,22 +963,9 @@ public abstract class PulsarWebResource {
return CompletableFuture.completedFuture(null);
}
- public void validateNamespacePolicyOperation(NamespaceName namespaceName,
- PolicyName policy,
+ public void validateNamespacePolicyOperation(NamespaceName namespaceName,
PolicyName policy,
PolicyOperation operation) {
- try {
- int timeout =
pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds();
- validateNamespacePolicyOperationAsync(namespaceName, policy,
operation).get(timeout, SECONDS);
- } catch (InterruptedException | TimeoutException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof WebApplicationException){
- throw (WebApplicationException) cause;
- } else {
- throw new RestException(cause);
- }
- }
+ sync(()-> validateNamespacePolicyOperationAsync(namespaceName, policy,
operation));
}
public CompletableFuture<Void>
validateNamespacePolicyOperationAsync(NamespaceName namespaceName,
@@ -1177,19 +1113,7 @@ public abstract class PulsarWebResource {
}
public void validateTopicPolicyOperation(TopicName topicName, PolicyName
policy, PolicyOperation operation) {
- try {
- int timeout =
pulsar().getConfiguration().getMetadataStoreOperationTimeoutSeconds();
- validateTopicPolicyOperationAsync(topicName, policy,
operation).get(timeout, SECONDS);
- } catch (InterruptedException | TimeoutException e) {
- throw new RestException(e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof WebApplicationException){
- throw (WebApplicationException) cause;
- } else {
- throw new RestException(cause);
- }
- }
+ sync(()-> validateTopicPolicyOperationAsync(topicName, policy,
operation));
}
public CompletableFuture<Void> validateTopicPolicyOperationAsync(TopicName
topicName,
@@ -1219,16 +1143,7 @@ public abstract class PulsarWebResource {
}
public void validateTopicOperation(TopicName topicName, TopicOperation
operation, String subscription) {
- try {
- validateTopicOperationAsync(topicName, operation,
subscription).get();
- } catch (InterruptedException | ExecutionException e) {
- Throwable cause = FutureUtil.unwrapCompletionException(e);
- if (cause instanceof WebApplicationException){
- throw (WebApplicationException) cause;
- } else {
- throw new RestException(cause);
- }
- }
+ sync(()-> validateTopicOperationAsync(topicName, operation,
subscription));
}
public CompletableFuture<Void> validateTopicOperationAsync(TopicName
topicName, TopicOperation operation) {
@@ -1259,13 +1174,19 @@ public abstract class PulsarWebResource {
}
}
- protected Void handleCommonRestAsyncException(AsyncResponse asyncResponse,
Throwable ex) {
- Throwable realCause = FutureUtil.unwrapCompletionException(ex);
- if (realCause instanceof WebApplicationException) {
- asyncResponse.resume(realCause);
- } else {
- asyncResponse.resume(new RestException(realCause));
+ public <T> T sync(Supplier<CompletableFuture<T>> supplier) {
+ try {
+ return
supplier.get().get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ } catch (ExecutionException | TimeoutException ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException) {
+ throw (WebApplicationException) realCause;
+ } else {
+ throw new RestException(realCause);
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new RestException(ex);
}
- return null;
}
}