mattisonchao commented on a change in pull request #13904:
URL: https://github.com/apache/pulsar/pull/13904#discussion_r801237356
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String
methodName, Throwable thr, Asyn
}
protected void internalTruncateNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
- Topic topic;
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- topic = getTopicReference(topicName);
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- CompletableFuture<Void> future = topic.truncate();
- future.thenAccept(a -> {
- asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
- Response.Status.NO_CONTENT.getReasonPhrase()));
- }).exceptionally(e -> {
- asyncResponse.resume(e);
- return null;
- });
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> topic.truncate())
+ .thenAccept(__ -> {
+ asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
Review comment:
Why ``resume`` Exception ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String
tenant) {
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
String
originalPrincipal, String tenant,
-
AuthenticationDataSource authenticationData)
- throws Exception {
+
AuthenticationDataSource authenticationData) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId,
originalPrincipal, tenant, authenticationData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ Throwable cause = e.getCause();
Review comment:
``InterruptedException``does not have ``getCause()``.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String
methodName, Throwable thr, Asyn
}
protected void internalTruncateNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
- Topic topic;
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- topic = getTopicReference(topicName);
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- CompletableFuture<Void> future = topic.truncate();
- future.thenAccept(a -> {
- asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
- Response.Status.NO_CONTENT.getReasonPhrase()));
- }).exceptionally(e -> {
- asyncResponse.resume(e);
- return null;
- });
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> topic.truncate())
+ .thenAccept(__ -> {
+ asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
+ Response.Status.NO_CONTENT.getReasonPhrase()));
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ if (cause instanceof WebApplicationException
+ && ((WebApplicationException)
cause).getResponse().getStatus()
+ ==
Status.TEMPORARY_REDIRECT.getStatusCode()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to truncate non-partitioned
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, cause);
+ }
+ } else {
+ log.error("[{}] Failed to truncate non-partitioned
topic {}", clientAppId(), topicName, cause);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
}
protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean
authoritative) {
-
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative,
false).whenComplete((meta, t) -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics()
- .truncateAsync(topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicName, exception);
- asyncResponse.resume(new
RestException(exception));
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .whenComplete((meta, t) -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ for (int i = 0; i < meta.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics()
+
.truncateAsync(topicNamePartition.toString()));
+ } catch (Exception e) {
Review comment:
```suggestion
} catch (PulsarServerException e) {
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String
methodName, Throwable thr, Asyn
}
protected void internalTruncateNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
- Topic topic;
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- topic = getTopicReference(topicName);
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- CompletableFuture<Void> future = topic.truncate();
- future.thenAccept(a -> {
- asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
- Response.Status.NO_CONTENT.getReasonPhrase()));
- }).exceptionally(e -> {
- asyncResponse.resume(e);
- return null;
- });
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> topic.truncate())
+ .thenAccept(__ -> {
+ asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
+ Response.Status.NO_CONTENT.getReasonPhrase()));
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ if (cause instanceof WebApplicationException
+ && ((WebApplicationException)
cause).getResponse().getStatus()
+ ==
Status.TEMPORARY_REDIRECT.getStatusCode()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to truncate non-partitioned
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, cause);
+ }
+ } else {
+ log.error("[{}] Failed to truncate non-partitioned
topic {}", clientAppId(), topicName, cause);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
}
protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean
authoritative) {
-
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative,
false).whenComplete((meta, t) -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics()
- .truncateAsync(topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicName, exception);
- asyncResponse.resume(new
RestException(exception));
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .whenComplete((meta, t) -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ for (int i = 0; i < meta.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics()
+
.truncateAsync(topicNamePartition.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to truncate topic
{}", clientAppId(), topicNamePartition, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
}
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
Review comment:
Use ``whenComplete`` instead of ``handle`` or return this future to the
upper layer future?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String
tenant) {
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
String
originalPrincipal, String tenant,
-
AuthenticationDataSource authenticationData)
- throws Exception {
+
AuthenticationDataSource authenticationData) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId,
originalPrincipal, tenant, authenticationData).get();
Review comment:
``get()`` need timeout?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String
tenant) {
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
String
originalPrincipal, String tenant,
-
AuthenticationDataSource authenticationData)
- throws Exception {
+
AuthenticationDataSource authenticationData) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId,
originalPrincipal, tenant, authenticationData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException){
+ throw (WebApplicationException) cause;
+ } else {
+ throw new RestException(cause);
+ }
+ }
+ }
+
+ /**
+ * Checks that the http client role has admin access to the specified
tenant async.
+ *
+ * @param tenant the tenant id
+ */
+ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String
tenant) {
+ return validateAdminAccessForTenantAsync(pulsar(), clientAppId(),
originalPrincipal(), tenant,
+ clientAuthData());
+ }
+
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId,
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
Review comment:
Could we use ``combine`` or another method to avoid using global future
that complete at a lot of methods? it's hard to read the code. I think.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String
tenant) {
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
String
originalPrincipal, String tenant,
-
AuthenticationDataSource authenticationData)
- throws Exception {
+
AuthenticationDataSource authenticationData) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId,
originalPrincipal, tenant, authenticationData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException){
+ throw (WebApplicationException) cause;
+ } else {
+ throw new RestException(cause);
+ }
+ }
+ }
+
+ /**
+ * Checks that the http client role has admin access to the specified
tenant async.
+ *
+ * @param tenant the tenant id
+ */
+ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String
tenant) {
+ return validateAdminAccessForTenantAsync(pulsar(), clientAppId(),
originalPrincipal(), tenant,
+ clientAuthData());
+ }
+
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId,
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
if (log.isDebugEnabled()) {
log.debug("check admin access on tenant: {} - Authenticated: {} --
role: {}", tenant,
(isClientAuthenticated(clientAppId)), clientAppId);
}
- TenantInfo tenantInfo =
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant
does not exist"));
+ pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenCompose(tenantInfoOptional -> {
+ if (!tenantInfoOptional.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
+ }
+ TenantInfo tenantInfo = tenantInfoOptional.get();
+ if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration()
+ .isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId)) {
+ throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
+ }
- if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration().isAuthorizationEnabled()) {
- if (!isClientAuthenticated(clientAppId)) {
- throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
- }
+
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId,
+ originalPrincipal);
-
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId, originalPrincipal);
+ if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ AuthorizationService authorizationService =
+
pulsar.getBrokerService().getAuthorizationService();
+ CompletableFuture<Boolean> isProxySuperUserFuture =
+
authorizationService.isSuperUser(clientAppId, authenticationData);
+ CompletableFuture<Boolean>
isOriginalPrincipalSuperUserFuture =
+
authorizationService.isSuperUser(originalPrincipal, authenticationData);
- if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
- CompletableFuture<Boolean> isProxySuperUserFuture;
- CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
- try {
- AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
- isProxySuperUserFuture =
authorizationService.isSuperUser(clientAppId, authenticationData);
-
- isOriginalPrincipalSuperUserFuture =
-
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
- boolean proxyAuthorized = isProxySuperUserFuture.get()
- || authorizationService.isTenantAdmin(tenant,
clientAppId,
- tenantInfo, authenticationData).get();
- boolean originalPrincipalAuthorized =
- isOriginalPrincipalSuperUserFuture.get() ||
authorizationService.isTenantAdmin(tenant,
- originalPrincipal, tenantInfo,
authenticationData).get();
- if (!proxyAuthorized || !originalPrincipalAuthorized) {
- throw new RestException(Status.UNAUTHORIZED,
- String.format("Proxy not authorized to access
resource (proxy:%s,original:%s)",
- clientAppId, originalPrincipal));
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
- log.debug("Successfully authorized {} (proxied by {}) on
tenant {}",
- originalPrincipal, clientAppId, tenant);
- } else {
- if (!pulsar.getBrokerService()
- .getAuthorizationService()
- .isSuperUser(clientAppId, authenticationData)
- .join()) {
- if (!pulsar.getBrokerService().getAuthorizationService()
- .isTenantAdmin(tenant, clientAppId, tenantInfo,
authenticationData).get()) {
- throw new RestException(Status.UNAUTHORIZED,
- "Don't have permission to administrate
resources on this tenant");
- }
- }
+
isProxySuperUserFuture.thenCompose(isProxySuperUser -> {
+ if (isProxySuperUser) {
+ return
CompletableFuture.completedFuture(true);
+ } else {
+ return
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+ authenticationData);
+ }
+ }).thenCompose(authorized -> {
+ if (!authorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ String.format(
+ "Proxy not authorized to
access resource (proxy:%s,original:%s)",
+ clientAppId,
originalPrincipal));
+ }
+ return null;
+ }).exceptionally(ex -> {
+ future.completeExceptionally(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ ex.getMessage()));
+ return null;
+ });
- log.debug("Successfully authorized {} on tenant {}",
clientAppId, tenant);
- }
- }
+
isOriginalPrincipalSuperUserFuture.thenCompose(isOriginalPrincipalSuperUser -> {
+ if (isOriginalPrincipalSuperUser) {
+ return
CompletableFuture.completedFuture(true);
+ } else {
+ return
authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo,
+ authenticationData);
+ }
+ }).thenCompose(originalPrincipalAuthorized -> {
+ if (!originalPrincipalAuthorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ String.format(
+ "Proxy not authorized to
access resource (proxy:%s,original:%s)",
+ clientAppId,
originalPrincipal));
+ } else {
+ log.debug("Successfully authorized {}
(proxied by {}) on tenant {}",
+ originalPrincipal, clientAppId,
tenant);
+ future.complete(null);
+ }
+ return null;
+ }).exceptionally(ex -> {
+ future.completeExceptionally(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ ex.getMessage()));
+ return null;
+ });
+ } else {
+ pulsar.getBrokerService()
+ .getAuthorizationService()
+ .isSuperUser(clientAppId,
authenticationData)
+ .thenCompose(isSuperUser -> {
+ if (!isSuperUser) {
+ return
pulsar.getBrokerService().getAuthorizationService()
+ .isTenantAdmin(tenant,
clientAppId, tenantInfo, authenticationData);
+ } else {
+ return
CompletableFuture.completedFuture(true);
+ }
+ }).thenCompose(authorized -> {
+ if (!authorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ "Don't have permission to
administrate resources on this tenant");
+ } else {
+ log.debug("Successfully authorized
{} on tenant {}", clientAppId, tenant);
+ future.complete(null);
+ }
+ return null;
+ }).exceptionally(ex -> {
+ future.completeExceptionally(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ ex.getMessage()));
+ return null;
+ });
+ }
+ }
+ return null;
Review comment:
NPE?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String
tenant) {
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
String
originalPrincipal, String tenant,
-
AuthenticationDataSource authenticationData)
- throws Exception {
+
AuthenticationDataSource authenticationData) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId,
originalPrincipal, tenant, authenticationData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException){
+ throw (WebApplicationException) cause;
+ } else {
+ throw new RestException(cause);
+ }
+ }
+ }
+
+ /**
+ * Checks that the http client role has admin access to the specified
tenant async.
+ *
+ * @param tenant the tenant id
+ */
+ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String
tenant) {
+ return validateAdminAccessForTenantAsync(pulsar(), clientAppId(),
originalPrincipal(), tenant,
+ clientAuthData());
+ }
+
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId,
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
if (log.isDebugEnabled()) {
log.debug("check admin access on tenant: {} - Authenticated: {} --
role: {}", tenant,
(isClientAuthenticated(clientAppId)), clientAppId);
}
- TenantInfo tenantInfo =
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant
does not exist"));
+ pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenCompose(tenantInfoOptional -> {
+ if (!tenantInfoOptional.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
+ }
+ TenantInfo tenantInfo = tenantInfoOptional.get();
+ if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration()
+ .isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId)) {
+ throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
+ }
- if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration().isAuthorizationEnabled()) {
- if (!isClientAuthenticated(clientAppId)) {
- throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
- }
+
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId,
+ originalPrincipal);
-
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId, originalPrincipal);
+ if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ AuthorizationService authorizationService =
+
pulsar.getBrokerService().getAuthorizationService();
+ CompletableFuture<Boolean> isProxySuperUserFuture =
+
authorizationService.isSuperUser(clientAppId, authenticationData);
+ CompletableFuture<Boolean>
isOriginalPrincipalSuperUserFuture =
+
authorizationService.isSuperUser(originalPrincipal, authenticationData);
- if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
- CompletableFuture<Boolean> isProxySuperUserFuture;
- CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
- try {
- AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
- isProxySuperUserFuture =
authorizationService.isSuperUser(clientAppId, authenticationData);
-
- isOriginalPrincipalSuperUserFuture =
-
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
- boolean proxyAuthorized = isProxySuperUserFuture.get()
- || authorizationService.isTenantAdmin(tenant,
clientAppId,
- tenantInfo, authenticationData).get();
- boolean originalPrincipalAuthorized =
- isOriginalPrincipalSuperUserFuture.get() ||
authorizationService.isTenantAdmin(tenant,
- originalPrincipal, tenantInfo,
authenticationData).get();
- if (!proxyAuthorized || !originalPrincipalAuthorized) {
- throw new RestException(Status.UNAUTHORIZED,
- String.format("Proxy not authorized to access
resource (proxy:%s,original:%s)",
- clientAppId, originalPrincipal));
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
- log.debug("Successfully authorized {} (proxied by {}) on
tenant {}",
- originalPrincipal, clientAppId, tenant);
- } else {
- if (!pulsar.getBrokerService()
- .getAuthorizationService()
- .isSuperUser(clientAppId, authenticationData)
- .join()) {
- if (!pulsar.getBrokerService().getAuthorizationService()
- .isTenantAdmin(tenant, clientAppId, tenantInfo,
authenticationData).get()) {
- throw new RestException(Status.UNAUTHORIZED,
- "Don't have permission to administrate
resources on this tenant");
- }
- }
+
isProxySuperUserFuture.thenCompose(isProxySuperUser -> {
+ if (isProxySuperUser) {
+ return
CompletableFuture.completedFuture(true);
+ } else {
+ return
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+ authenticationData);
+ }
+ }).thenCompose(authorized -> {
+ if (!authorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ String.format(
+ "Proxy not authorized to
access resource (proxy:%s,original:%s)",
+ clientAppId,
originalPrincipal));
+ }
+ return null;
Review comment:
NPE?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String
methodName, Throwable thr, Asyn
}
protected void internalTruncateNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
- Topic topic;
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- topic = getTopicReference(topicName);
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- CompletableFuture<Void> future = topic.truncate();
- future.thenAccept(a -> {
- asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
- Response.Status.NO_CONTENT.getReasonPhrase()));
- }).exceptionally(e -> {
- asyncResponse.resume(e);
- return null;
- });
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> topic.truncate())
+ .thenAccept(__ -> {
+ asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
+ Response.Status.NO_CONTENT.getReasonPhrase()));
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ if (cause instanceof WebApplicationException
+ && ((WebApplicationException)
cause).getResponse().getStatus()
+ ==
Status.TEMPORARY_REDIRECT.getStatusCode()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to truncate non-partitioned
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, cause);
+ }
+ } else {
+ log.error("[{}] Failed to truncate non-partitioned
topic {}", clientAppId(), topicName, cause);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
}
protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean
authoritative) {
-
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative,
false).whenComplete((meta, t) -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics()
- .truncateAsync(topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicName, exception);
- asyncResponse.resume(new
RestException(exception));
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .whenComplete((meta, t) -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ for (int i = 0; i < meta.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics()
+
.truncateAsync(topicNamePartition.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to truncate topic
{}", clientAppId(), topicNamePartition, e);
Review comment:
```suggestion
log.error("[{}] Failed to truncate
topic, while getting admin client {}", clientAppId(), topicNamePartition, e);
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String
tenant) {
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
String
originalPrincipal, String tenant,
-
AuthenticationDataSource authenticationData)
- throws Exception {
+
AuthenticationDataSource authenticationData) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId,
originalPrincipal, tenant, authenticationData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException){
+ throw (WebApplicationException) cause;
+ } else {
+ throw new RestException(cause);
+ }
+ }
+ }
+
+ /**
+ * Checks that the http client role has admin access to the specified
tenant async.
+ *
+ * @param tenant the tenant id
+ */
+ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String
tenant) {
+ return validateAdminAccessForTenantAsync(pulsar(), clientAppId(),
originalPrincipal(), tenant,
+ clientAuthData());
+ }
+
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId,
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
if (log.isDebugEnabled()) {
log.debug("check admin access on tenant: {} - Authenticated: {} --
role: {}", tenant,
(isClientAuthenticated(clientAppId)), clientAppId);
}
- TenantInfo tenantInfo =
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant
does not exist"));
+ pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenCompose(tenantInfoOptional -> {
+ if (!tenantInfoOptional.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
+ }
+ TenantInfo tenantInfo = tenantInfoOptional.get();
+ if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration()
+ .isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId)) {
+ throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
+ }
- if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration().isAuthorizationEnabled()) {
- if (!isClientAuthenticated(clientAppId)) {
- throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
- }
+
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId,
+ originalPrincipal);
-
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId, originalPrincipal);
+ if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ AuthorizationService authorizationService =
+
pulsar.getBrokerService().getAuthorizationService();
+ CompletableFuture<Boolean> isProxySuperUserFuture =
+
authorizationService.isSuperUser(clientAppId, authenticationData);
+ CompletableFuture<Boolean>
isOriginalPrincipalSuperUserFuture =
+
authorizationService.isSuperUser(originalPrincipal, authenticationData);
- if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
- CompletableFuture<Boolean> isProxySuperUserFuture;
- CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
- try {
- AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
- isProxySuperUserFuture =
authorizationService.isSuperUser(clientAppId, authenticationData);
-
- isOriginalPrincipalSuperUserFuture =
-
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
- boolean proxyAuthorized = isProxySuperUserFuture.get()
- || authorizationService.isTenantAdmin(tenant,
clientAppId,
- tenantInfo, authenticationData).get();
- boolean originalPrincipalAuthorized =
- isOriginalPrincipalSuperUserFuture.get() ||
authorizationService.isTenantAdmin(tenant,
- originalPrincipal, tenantInfo,
authenticationData).get();
- if (!proxyAuthorized || !originalPrincipalAuthorized) {
- throw new RestException(Status.UNAUTHORIZED,
- String.format("Proxy not authorized to access
resource (proxy:%s,original:%s)",
- clientAppId, originalPrincipal));
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
- log.debug("Successfully authorized {} (proxied by {}) on
tenant {}",
- originalPrincipal, clientAppId, tenant);
- } else {
- if (!pulsar.getBrokerService()
- .getAuthorizationService()
- .isSuperUser(clientAppId, authenticationData)
- .join()) {
- if (!pulsar.getBrokerService().getAuthorizationService()
- .isTenantAdmin(tenant, clientAppId, tenantInfo,
authenticationData).get()) {
- throw new RestException(Status.UNAUTHORIZED,
- "Don't have permission to administrate
resources on this tenant");
- }
- }
+
isProxySuperUserFuture.thenCompose(isProxySuperUser -> {
+ if (isProxySuperUser) {
+ return
CompletableFuture.completedFuture(true);
+ } else {
+ return
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+ authenticationData);
+ }
+ }).thenCompose(authorized -> {
+ if (!authorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ String.format(
+ "Proxy not authorized to
access resource (proxy:%s,original:%s)",
+ clientAppId,
originalPrincipal));
+ }
+ return null;
+ }).exceptionally(ex -> {
+ future.completeExceptionally(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ ex.getMessage()));
+ return null;
+ });
- log.debug("Successfully authorized {} on tenant {}",
clientAppId, tenant);
- }
- }
+
isOriginalPrincipalSuperUserFuture.thenCompose(isOriginalPrincipalSuperUser -> {
+ if (isOriginalPrincipalSuperUser) {
+ return
CompletableFuture.completedFuture(true);
+ } else {
+ return
authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo,
+ authenticationData);
+ }
+ }).thenCompose(originalPrincipalAuthorized -> {
+ if (!originalPrincipalAuthorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ String.format(
+ "Proxy not authorized to
access resource (proxy:%s,original:%s)",
+ clientAppId,
originalPrincipal));
+ } else {
+ log.debug("Successfully authorized {}
(proxied by {}) on tenant {}",
+ originalPrincipal, clientAppId,
tenant);
+ future.complete(null);
+ }
+ return null;
+ }).exceptionally(ex -> {
+ future.completeExceptionally(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ ex.getMessage()));
+ return null;
+ });
+ } else {
+ pulsar.getBrokerService()
+ .getAuthorizationService()
+ .isSuperUser(clientAppId,
authenticationData)
+ .thenCompose(isSuperUser -> {
+ if (!isSuperUser) {
+ return
pulsar.getBrokerService().getAuthorizationService()
+ .isTenantAdmin(tenant,
clientAppId, tenantInfo, authenticationData);
+ } else {
+ return
CompletableFuture.completedFuture(true);
+ }
+ }).thenCompose(authorized -> {
+ if (!authorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ "Don't have permission to
administrate resources on this tenant");
+ } else {
+ log.debug("Successfully authorized
{} on tenant {}", clientAppId, tenant);
+ future.complete(null);
+ }
+ return null;
Review comment:
NPE?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String
tenant) {
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
String
originalPrincipal, String tenant,
-
AuthenticationDataSource authenticationData)
- throws Exception {
+
AuthenticationDataSource authenticationData) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId,
originalPrincipal, tenant, authenticationData).get();
+ } catch (InterruptedException | ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException){
+ throw (WebApplicationException) cause;
+ } else {
+ throw new RestException(cause);
+ }
+ }
+ }
+
+ /**
+ * Checks that the http client role has admin access to the specified
tenant async.
+ *
+ * @param tenant the tenant id
+ */
+ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String
tenant) {
+ return validateAdminAccessForTenantAsync(pulsar(), clientAppId(),
originalPrincipal(), tenant,
+ clientAuthData());
+ }
+
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId,
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
if (log.isDebugEnabled()) {
log.debug("check admin access on tenant: {} - Authenticated: {} --
role: {}", tenant,
(isClientAuthenticated(clientAppId)), clientAppId);
}
- TenantInfo tenantInfo =
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant
does not exist"));
+ pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenCompose(tenantInfoOptional -> {
+ if (!tenantInfoOptional.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
+ }
+ TenantInfo tenantInfo = tenantInfoOptional.get();
+ if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration()
+ .isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId)) {
+ throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
+ }
- if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration().isAuthorizationEnabled()) {
- if (!isClientAuthenticated(clientAppId)) {
- throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
- }
+
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId,
+ originalPrincipal);
-
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId, originalPrincipal);
+ if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ AuthorizationService authorizationService =
+
pulsar.getBrokerService().getAuthorizationService();
+ CompletableFuture<Boolean> isProxySuperUserFuture =
+
authorizationService.isSuperUser(clientAppId, authenticationData);
+ CompletableFuture<Boolean>
isOriginalPrincipalSuperUserFuture =
+
authorizationService.isSuperUser(originalPrincipal, authenticationData);
- if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
- CompletableFuture<Boolean> isProxySuperUserFuture;
- CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
- try {
- AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
- isProxySuperUserFuture =
authorizationService.isSuperUser(clientAppId, authenticationData);
-
- isOriginalPrincipalSuperUserFuture =
-
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
- boolean proxyAuthorized = isProxySuperUserFuture.get()
- || authorizationService.isTenantAdmin(tenant,
clientAppId,
- tenantInfo, authenticationData).get();
- boolean originalPrincipalAuthorized =
- isOriginalPrincipalSuperUserFuture.get() ||
authorizationService.isTenantAdmin(tenant,
- originalPrincipal, tenantInfo,
authenticationData).get();
- if (!proxyAuthorized || !originalPrincipalAuthorized) {
- throw new RestException(Status.UNAUTHORIZED,
- String.format("Proxy not authorized to access
resource (proxy:%s,original:%s)",
- clientAppId, originalPrincipal));
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
- log.debug("Successfully authorized {} (proxied by {}) on
tenant {}",
- originalPrincipal, clientAppId, tenant);
- } else {
- if (!pulsar.getBrokerService()
- .getAuthorizationService()
- .isSuperUser(clientAppId, authenticationData)
- .join()) {
- if (!pulsar.getBrokerService().getAuthorizationService()
- .isTenantAdmin(tenant, clientAppId, tenantInfo,
authenticationData).get()) {
- throw new RestException(Status.UNAUTHORIZED,
- "Don't have permission to administrate
resources on this tenant");
- }
- }
+
isProxySuperUserFuture.thenCompose(isProxySuperUser -> {
+ if (isProxySuperUser) {
+ return
CompletableFuture.completedFuture(true);
+ } else {
+ return
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+ authenticationData);
+ }
+ }).thenCompose(authorized -> {
+ if (!authorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ String.format(
+ "Proxy not authorized to
access resource (proxy:%s,original:%s)",
+ clientAppId,
originalPrincipal));
+ }
+ return null;
+ }).exceptionally(ex -> {
+ future.completeExceptionally(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ ex.getMessage()));
+ return null;
+ });
- log.debug("Successfully authorized {} on tenant {}",
clientAppId, tenant);
- }
- }
+
isOriginalPrincipalSuperUserFuture.thenCompose(isOriginalPrincipalSuperUser -> {
+ if (isOriginalPrincipalSuperUser) {
+ return
CompletableFuture.completedFuture(true);
+ } else {
+ return
authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo,
+ authenticationData);
+ }
+ }).thenCompose(originalPrincipalAuthorized -> {
+ if (!originalPrincipalAuthorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ String.format(
+ "Proxy not authorized to
access resource (proxy:%s,original:%s)",
+ clientAppId,
originalPrincipal));
+ } else {
+ log.debug("Successfully authorized {}
(proxied by {}) on tenant {}",
+ originalPrincipal, clientAppId,
tenant);
+ future.complete(null);
+ }
+ return null;
Review comment:
NPE?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String
methodName, Throwable thr, Asyn
}
protected void internalTruncateNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
- Topic topic;
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- topic = getTopicReference(topicName);
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- CompletableFuture<Void> future = topic.truncate();
- future.thenAccept(a -> {
- asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
- Response.Status.NO_CONTENT.getReasonPhrase()));
- }).exceptionally(e -> {
- asyncResponse.resume(e);
- return null;
- });
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> topic.truncate())
+ .thenAccept(__ -> {
+ asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
+ Response.Status.NO_CONTENT.getReasonPhrase()));
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ if (cause instanceof WebApplicationException
+ && ((WebApplicationException)
cause).getResponse().getStatus()
+ ==
Status.TEMPORARY_REDIRECT.getStatusCode()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to truncate non-partitioned
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, cause);
+ }
+ } else {
+ log.error("[{}] Failed to truncate non-partitioned
topic {}", clientAppId(), topicName, cause);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
}
protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean
authoritative) {
-
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative,
false).whenComplete((meta, t) -> {
- if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- for (int i = 0; i < meta.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics()
- .truncateAsync(topicNamePartition.toString()));
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicName, exception);
- asyncResponse.resume(new
RestException(exception));
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .whenComplete((meta, t) -> {
+ if (meta.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
Review comment:
```suggestion
final List<CompletableFuture<Void>> futures =
Lists.newArrayList(meta.partitions);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]