This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 273446b7f77 [improve][broker] Make some methods of `ClusterBase` pure
async. (#15318)
273446b7f77 is described below
commit 273446b7f7738828d8daf15a5d4c305d7e310e6b
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu May 5 04:13:24 2022 +0800
[improve][broker] Make some methods of `ClusterBase` pure async. (#15318)
---
.../pulsar/broker/resources/ClusterResources.java | 11 +-
.../apache/pulsar/broker/admin/AdminResource.java | 8 +-
.../pulsar/broker/admin/impl/ClustersBase.java | 151 ++++++++++-----------
.../org/apache/pulsar/broker/admin/AdminTest.java | 61 +++++----
4 files changed, 123 insertions(+), 108 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 4429afe5939..d3acd7bbe10 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -43,7 +43,7 @@ public class ClusterResources extends
BaseResources<ClusterData> {
}
public CompletableFuture<Set<String>> listAsync() {
- return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(list -> new
HashSet<>(list));
+ return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(HashSet::new);
}
public Set<String> list() throws MetadataStoreException {
@@ -66,6 +66,15 @@ public class ClusterResources extends
BaseResources<ClusterData> {
create(joinPath(BASE_CLUSTERS_PATH, clusterName), clusterData);
}
+ public CompletableFuture<Void> createClusterAsync(String clusterName,
ClusterData clusterData) {
+ return createAsync(joinPath(BASE_CLUSTERS_PATH, clusterName),
clusterData);
+ }
+
+ public CompletableFuture<Void> updateClusterAsync(String clusterName,
+ Function<ClusterData,
ClusterData> modifyFunction) {
+ return setAsync(joinPath(BASE_CLUSTERS_PATH, clusterName),
modifyFunction);
+ }
+
public void updateCluster(String clusterName, Function<ClusterData,
ClusterData> modifyFunction)
throws MetadataStoreException {
set(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 03e103428f7..ef167829f8d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -141,11 +141,15 @@ public abstract class AdminResource extends
PulsarWebResource {
return
pulsar().getPulsarResources().getNamespaceResources().getPoliciesReadOnlyAsync()
.thenAccept(arePoliciesReadOnly -> {
if (arePoliciesReadOnly) {
- log.debug("Policies are read-only. Broker cannot do
read-write operations");
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot
do read-write operations");
+ }
throw new RestException(Status.FORBIDDEN, "Broker is
forbidden to do read-write operations");
} else {
// Do nothing, just log the message.
- log.debug("Broker is allowed to make read-write
operations");
+ if (log.isDebugEnabled()) {
+ log.debug("Broker is allowed to make read-write
operations");
+ }
}
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 17bf1301e45..6b984930ed7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -47,8 +47,8 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import org.apache.pulsar.broker.admin.AdminResource;
import
org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
-import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.common.naming.Constants;
@@ -64,11 +64,12 @@ import
org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ClustersBase extends PulsarWebResource {
+public class ClustersBase extends AdminResource {
@GET
@ApiOperation(
@@ -79,16 +80,18 @@ public class ClustersBase extends PulsarWebResource {
@ApiResponse(code = 200, message = "Return a list of clusters."),
@ApiResponse(code = 500, message = "Internal server error.")
})
- public Set<String> getClusters() throws Exception {
- try {
- // Remove "global" cluster from returned list
- Set<String> clusters = clusterResources().list().stream()
- .filter(cluster ->
!Constants.GLOBAL_CLUSTER.equals(cluster)).collect(Collectors.toSet());
- return clusters;
- } catch (Exception e) {
- log.error("[{}] Failed to get clusters list", clientAppId(), e);
- throw new RestException(e);
- }
+ public void getClusters(@Suspended AsyncResponse asyncResponse) {
+ clusterResources().listAsync()
+ .thenApply(clusters -> clusters.stream()
+ // Remove "global" cluster from returned list
+ .filter(cluster ->
!Constants.GLOBAL_CLUSTER.equals(cluster))
+ .collect(Collectors.toSet()))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get clusters {}", clientAppId(),
ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -104,26 +107,19 @@ public class ClustersBase extends PulsarWebResource {
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
- public ClusterData getCluster(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
- @PathParam("cluster") String cluster
- ) {
- validateSuperUserAccess();
-
- try {
- return clusterResources().getCluster(cluster)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster does not exist"));
- } catch (Exception e) {
- log.error("[{}] Failed to get cluster {}", clientAppId(), cluster,
e);
- if (e instanceof RestException) {
- throw (RestException) e;
- } else {
- throw new RestException(e);
- }
- }
+ public void getCluster(@Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required =
true)
+ @PathParam("cluster") String cluster) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> clusterResources().getClusterAsync(cluster))
+ .thenAccept(clusterData -> {
+ asyncResponse.resume(clusterData
+ .orElseThrow(() -> new
RestException(Status.NOT_FOUND, "Cluster does not exist")));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get cluster {}", clientAppId(),
cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -140,10 +136,8 @@ public class ClustersBase extends PulsarWebResource {
@ApiResponse(code = 500, message = "Internal server error.")
})
public void createCluster(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The cluster data",
@@ -158,27 +152,31 @@ public class ClustersBase extends PulsarWebResource {
+ "}"
)
)
- )
- ClusterDataImpl clusterData
- ) {
- validateSuperUserAccess();
- validatePoliciesReadOnlyAccess();
-
- try {
- NamedEntity.checkName(cluster);
- if (clusterResources().getCluster(cluster).isPresent()) {
- log.warn("[{}] Failed to create already existing cluster {}",
clientAppId(), cluster);
- throw new RestException(Status.CONFLICT, "Cluster already
exists");
- }
- clusterResources().createCluster(cluster, clusterData);
- log.info("[{}] Created cluster {}", clientAppId(), cluster);
- } catch (IllegalArgumentException e) {
- log.warn("[{}] Failed to create cluster with invalid name {}",
clientAppId(), cluster, e);
- throw new RestException(Status.PRECONDITION_FAILED, "Cluster name
is not valid");
- } catch (Exception e) {
- log.error("[{}] Failed to create cluster {}", clientAppId(),
cluster, e);
- throw new RestException(e);
- }
+ ) ClusterDataImpl clusterData) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> {
+ NamedEntity.checkName(cluster);
+ return clusterResources().getClusterAsync(cluster);
+ }).thenCompose(clusterOpt -> {
+ if (clusterOpt.isPresent()) {
+ throw new RestException(Status.CONFLICT, "Cluster
already exists");
+ }
+ return clusterResources().createClusterAsync(cluster,
clusterData);
+ }).thenAccept(__ -> {
+ log.info("[{}] Created cluster {}", clientAppId(),
cluster);
+ asyncResponse.resume(Response.ok().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create cluster {}",
clientAppId(), cluster, ex);
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof IllegalArgumentException) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Cluster name is not valid"));
+ return null;
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -193,10 +191,8 @@ public class ClustersBase extends PulsarWebResource {
@ApiResponse(code = 500, message = "Internal server error.")
})
public void updateCluster(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The cluster data",
@@ -211,22 +207,23 @@ public class ClustersBase extends PulsarWebResource {
+ "}"
)
)
- )
- ClusterDataImpl clusterData
- ) {
- validateSuperUserAccess();
- validatePoliciesReadOnlyAccess();
-
- try {
- clusterResources().updateCluster(cluster, old -> clusterData);
- log.info("[{}] Updated cluster {}", clientAppId(), cluster);
- } catch (NotFoundException e) {
- log.warn("[{}] Failed to update cluster {}: Does not exist",
clientAppId(), cluster);
- throw new RestException(Status.NOT_FOUND, "Cluster does not
exist");
- } catch (Exception e) {
- log.error("[{}] Failed to update cluster {}", clientAppId(),
cluster, e);
- throw new RestException(e);
- }
+ ) ClusterDataImpl clusterData) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ ->
clusterResources().updateClusterAsync(cluster, old -> clusterData))
+ .thenAccept(__ -> {
+ log.info("[{}] Updated cluster {}", clientAppId(),
cluster);
+ asyncResponse.resume(Response.ok().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to update cluster {}",
clientAppId(), cluster, ex);
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof
MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Cluster does not exist"));
+ return null;
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 21ccff80c17..d39ad3fbb97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -203,18 +203,19 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
@Test
public void clusters() throws Exception {
- assertEquals(clusters.getClusters(), Lists.newArrayList());
- verify(clusters, never()).validateSuperUserAccess();
+ assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet());
+ verify(clusters, never()).validateSuperUserAccessAsync();
- clusters.createCluster("use",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
- verify(clusters, times(1)).validateSuperUserAccess();
+ asynRequests(ctx -> clusters.createCluster(ctx,
+ "use",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()));
// ensure to read from ZooKeeper directly
//clusters.clustersListCache().clear();
- assertEquals(clusters.getClusters(), Lists.newArrayList("use"));
+ assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet("use"));
// Check creating existing cluster
try {
- clusters.createCluster("use",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
+ asynRequests(ctx -> clusters.createCluster(ctx, "use",
+
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.CONFLICT.getStatusCode());
@@ -228,14 +229,14 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
}
- assertEquals(clusters.getCluster("use"),
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
- verify(clusters, times(4)).validateSuperUserAccess();
+ assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")),
+
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
- clusters.updateCluster("use",
ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());
- verify(clusters, times(5)).validateSuperUserAccess();
+ asynRequests(ctx -> clusters.updateCluster(ctx, "use",
+
ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build()));
- assertEquals(clusters.getCluster("use"),
ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());
- verify(clusters, times(6)).validateSuperUserAccess();
+ assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")),
+
ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());
try {
clusters.getNamespaceIsolationPolicies("use");
@@ -271,18 +272,17 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty());
clusters.deleteCluster("use");
- verify(clusters, times(13)).validateSuperUserAccess();
- assertEquals(clusters.getClusters(), Lists.newArrayList());
+ assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet());
try {
- clusters.getCluster("use");
+ asynRequests(ctx -> clusters.getCluster(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
try {
- clusters.updateCluster("use", ClusterDataImpl.builder().build());
+ asynRequests(ctx -> clusters.updateCluster(ctx, "use",
ClusterDataImpl.builder().build()));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
@@ -309,7 +309,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
clusterCache.invalidateAll();
store.invalidateAll();
try {
- clusters.getClusters();
+ asynRequests(ctx -> clusters.getClusters(ctx));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -320,7 +320,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
&& path.equals("/admin/clusters/test");
});
try {
- clusters.createCluster("test",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080").build());
+ asynRequests(ctx -> clusters.createCluster(ctx, "test",
+
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080").build()));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -333,7 +334,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
clusterCache.invalidateAll();
store.invalidateAll();
try {
- clusters.updateCluster("test",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com").build());
+ asynRequests(ctx -> clusters.updateCluster(ctx, "test",
+
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com").build()));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -345,7 +347,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
});
try {
- clusters.getCluster("test");
+ asynRequests(ctx -> clusters.getCluster(ctx, "test"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -379,7 +381,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check name validations
try {
- clusters.createCluster("bf@",
ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com").build());
+ asynRequests(ctx -> clusters.createCluster(ctx, "bf@",
+
ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com").build()));
fail("should have filed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
@@ -387,7 +390,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check authentication and listener name
try {
- clusters.createCluster("auth", ClusterDataImpl.builder()
+ asynRequests(ctx -> clusters.createCluster(ctx, "auth",
ClusterDataImpl.builder()
.serviceUrl("http://dummy.web.example.com")
.serviceUrlTls("")
.brokerServiceUrl("http://dummy.messaging.example.com")
@@ -395,14 +398,16 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
.authenticationPlugin("authenticationPlugin")
.authenticationParameters("authenticationParameters")
.listenerName("listenerName")
- .build());
- ClusterData cluster = clusters.getCluster("auth");
+ .build()));
+ ClusterData cluster = (ClusterData) asynRequests(ctx ->
clusters.getCluster(ctx, "auth"));
assertEquals(cluster.getAuthenticationPlugin(),
"authenticationPlugin");
assertEquals(cluster.getAuthenticationParameters(),
"authenticationParameters");
assertEquals(cluster.getListenerName(), "listenerName");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
}
+ verify(clusters, times(13)).validateSuperUserAccessAsync();
+ verify(clusters, times(11)).validateSuperUserAccess();
}
Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception
{
@@ -421,7 +426,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
verify(properties, times(1)).validateSuperUserAccess();
// create local cluster
- clusters.createCluster(configClusterName,
ClusterDataImpl.builder().build());
+ asynRequests(ctx -> clusters.createCluster(ctx, configClusterName,
ClusterDataImpl.builder().build()));
Set<String> allowedClusters = Sets.newHashSet();
allowedClusters.add(configClusterName);
@@ -635,10 +640,10 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
@Test
@SuppressWarnings("unchecked")
public void brokers() throws Exception {
- clusters.createCluster("use", ClusterDataImpl.builder()
+ asynRequests(ctx -> clusters.createCluster(ctx, "use",
ClusterDataImpl.builder()
.serviceUrl("http://broker.messaging.use.example.com")
.serviceUrlTls("https://broker.messaging.use.example.com:4443")
- .build());
+ .build()));
URI requestUri = new URI(
"http://broker.messaging.use.example.com:8080/admin/brokers/use");
@@ -700,7 +705,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.allowedClusters(Collections.singleton(cluster))
.build();
ClusterDataImpl clusterData =
ClusterDataImpl.builder().serviceUrl(cluster).build();
- clusters.createCluster(cluster, clusterData );
+ asynRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData
));
asynRequests(ctx -> properties.createTenant(ctx, property, admin));
// customized bandwidth for this namespace