This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8ca01cd42ed [improve][admin] Align the auth and check it at the first
place for topic related API (#22507)
8ca01cd42ed is described below
commit 8ca01cd42edfd4efd986f752f6f8538ea5bf4f94
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Apr 17 18:46:22 2024 +0800
[improve][admin] Align the auth and check it at the first place for topic
related API (#22507)
---
.../broker/admin/impl/PersistentTopicsBase.java | 419 ++++++++++-----------
.../pulsar/broker/admin/v2/PersistentTopics.java | 44 ++-
.../apache/pulsar/broker/admin/TopicAuthZTest.java | 257 +++++++++++--
3 files changed, 447 insertions(+), 273 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ab74b1e2bcc..1f8d0657190 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -128,8 +128,6 @@ import
org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -2727,14 +2725,14 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<MessageId>
internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
return future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
@@ -2748,7 +2746,6 @@ public class PersistentTopicsBase extends AdminResource {
});
}
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic)) {
@@ -3158,65 +3155,56 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalGetBacklogSizeByMessageId(AsyncResponse
asyncResponse,
MessageIdImpl messageId,
boolean authoritative) {
- CompletableFuture<Void> ret;
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (!topicName.isPartitioned()) {
- ret = getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
- .thenCompose(topicMetadata -> {
- if (topicMetadata.partitions > 0) {
- log.warn("[{}] Not supported calculate backlog
size operation on partitioned-topic {}",
- clientAppId(), topicName);
- asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
- "calculate backlog size is not allowed for
partitioned-topic"));
- }
- return CompletableFuture.completedFuture(null);
- });
- } else {
- ret = CompletableFuture.completedFuture(null);
- }
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = ret.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName));
- } else {
- future = ret;
- }
- future.thenAccept(__ -> validateTopicOwnershipAsync(topicName,
authoritative)
- .thenCompose(unused -> validateTopicOperationAsync(topicName,
- TopicOperation.GET_BACKLOG_SIZE))
- .thenCompose(unused -> getTopicReferenceAsync(topicName))
- .thenAccept(t -> {
- PersistentTopic topic = (PersistentTopic) t;
- PositionImpl pos = new
PositionImpl(messageId.getLedgerId(),
- messageId.getEntryId());
- if (topic == null) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
-
getTopicNotFoundErrorMessage(topicName.toString())));
- return;
- }
- ManagedLedgerImpl managedLedger =
- (ManagedLedgerImpl) topic.getManagedLedger();
- if (messageId.getLedgerId() == -1) {
- asyncResponse.resume(managedLedger.getTotalSize());
- } else {
-
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
- }
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to
log it.
- if (isNot307And404Exception(ex)) {
- log.error("[{}] Failed to get backlog size for topic
{}", clientAppId(),
- topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- })).exceptionally(ex -> {
- // If the exception is not redirect exception we need
to log it.
- if (isNot307And404Exception(ex)) {
- log.error("[{}] Failed to validate global
namespace ownership "
- + "to get backlog size for topic {}",
clientAppId(), topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
+ CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.GET_BACKLOG_SIZE);
+ ret.thenCompose(__ -> {
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (!topicName.isPartitioned()) {
+ return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
+ .thenCompose(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported calculate backlog
size operation on partitioned-topic {}",
+ clientAppId(), topicName);
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "calculate backlog size is not allowed
for partitioned-topic"));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(unused -> getTopicReferenceAsync(topicName))
+ .thenAccept(t -> {
+ PersistentTopic topic = (PersistentTopic) t;
+ PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
+ messageId.getEntryId());
+ if (topic == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
+ return;
+ }
+ ManagedLedgerImpl managedLedger =
+ (ManagedLedgerImpl) topic.getManagedLedger();
+ if (messageId.getLedgerId() == -1) {
+ asyncResponse.resume(managedLedger.getTotalSize());
+ } else {
+
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+ }
+ }).exceptionally(ex -> {
+ // If the exception is not redirect exception we need to log it.
+ if (!isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to validate global namespace ownership "
+ + "to get backlog size for topic {}", clientAppId(),
topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
protected CompletableFuture<Void>
internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
@@ -3224,8 +3212,7 @@ public class PersistentTopicsBase extends AdminResource {
BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType
== null
? BacklogQuota.BacklogQuotaType.destination_storage :
backlogQuotaType;
- return validateTopicPolicyOperationAsync(topicName,
PolicyName.BACKLOG, PolicyOperation.WRITE)
- .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName,
isGlobal))
.thenCompose(op -> {
TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
@@ -3266,9 +3253,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void>
internalSetReplicationClusters(List<String> clusterIds) {
-
- return validateTopicPolicyOperationAsync(topicName,
PolicyName.REPLICATION, PolicyOperation.WRITE)
- .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> {
if (CollectionUtils.isEmpty(clusterIds)) {
throw new RestException(Status.PRECONDITION_FAILED,
"ClusterIds should not be null or empty");
@@ -3306,22 +3291,21 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<Void> internalRemoveReplicationClusters() {
- return validateTopicPolicyOperationAsync(topicName,
PolicyName.REPLICATION, PolicyOperation.WRITE)
- .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
- .thenCompose(__ ->
getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
- TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
- topicPolicies.setReplicationClusters(null);
- return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
- .thenRun(() -> {
- log.info("[{}] Successfully set
replication clusters for namespace={}, "
- + "topic={},
clusters={}",
- clientAppId(),
- namespaceName,
- topicName.getLocalName(),
-
topicPolicies.getReplicationClusters());
- });
- })
- );
+ return validatePoliciesReadOnlyAccessAsync()
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+ topicPolicies.setReplicationClusters(null);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
+ .thenRun(() -> {
+ log.info("[{}] Successfully set replication
clusters for namespace={}, "
+ + "topic={}, clusters={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName(),
+
topicPolicies.getReplicationClusters());
+ });
+ });
}
protected CompletableFuture<Boolean> internalGetDeduplication(boolean
applied, boolean isGlobal) {
@@ -3683,29 +3667,29 @@ public class PersistentTopicsBase extends AdminResource
{
"Termination of a system topic is not allowed"));
}
- CompletableFuture<Void> ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- ret = CompletableFuture.completedFuture(null);
- }
- return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.TERMINATE))
- .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
- .thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Termination of a partitioned topic is not
allowed");
- }
- })
- .thenCompose(__ -> getTopicReferenceAsync(topicName))
- .thenCompose(topic -> {
- if (!(topic instanceof PersistentTopic)) {
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Termination of a non-persistent topic is not
allowed");
- }
- return ((PersistentTopic) topic).terminate();
- });
+ CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.TERMINATE);
+ return ret.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
+ .thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Termination of a partitioned topic is not allowed");
+ }
+ })
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic)) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Termination of a non-persistent topic is not
allowed");
+ }
+ return ((PersistentTopic) topic).terminate();
+ });
}
protected void internalTerminatePartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
@@ -3716,73 +3700,63 @@ public class PersistentTopicsBase extends AdminResource
{
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
msg));
return;
}
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.TERMINATE);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
+ .thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ String msg = "Termination of a non-partitioned topic is not
allowed using partitioned-terminate"
+ + ", please use terminate commands";
+ log.error("[{}] [{}] {}", clientAppId(), topicName, msg);
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, msg));
+ return;
+ }
+ if (partitionMetadata.partitions > 0) {
+ Map<Integer, MessageId> messageIds = new
ConcurrentHashMap<>(partitionMetadata.partitions);
+ final List<CompletableFuture<MessageId>> futures =
+ new ArrayList<>(partitionMetadata.partitions);
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- future.thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.TERMINATE)
- .thenCompose(unused ->
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
- .thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions == 0) {
- String msg = "Termination of a non-partitioned topic
is not allowed using partitioned-terminate"
- + ", please use terminate commands";
- log.error("[{}] [{}] {}", clientAppId(), topicName,
msg);
- asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, msg));
- return;
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ int finalI = i;
+ futures.add(pulsar().getAdminClient().topics()
+
.terminateTopicAsync(topicNamePartition.toString())
+ .whenComplete((messageId, throwable) -> {
+ if (throwable != null) {
+ log.error("[{}] Failed to terminate
topic {}", clientAppId(),
+ topicNamePartition, throwable);
+ asyncResponse.resume(new
RestException(throwable));
+ }
+ messageIds.put(finalI, messageId);
+ }));
+ } catch (Exception e) {
+ log.error("[{}] Failed to terminate topic {}",
clientAppId(), topicNamePartition,
+ e);
+ throw new RestException(e);
}
- if (partitionMetadata.partitions > 0) {
- Map<Integer, MessageId> messageIds = new
ConcurrentHashMap<>(partitionMetadata.partitions);
- final List<CompletableFuture<MessageId>> futures =
- new ArrayList<>(partitionMetadata.partitions);
-
- for (int i = 0; i < partitionMetadata.partitions; i++)
{
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- int finalI = i;
- futures.add(pulsar().getAdminClient().topics()
-
.terminateTopicAsync(topicNamePartition.toString())
- .whenComplete((messageId, throwable)
-> {
- if (throwable != null) {
- log.error("[{}] Failed to
terminate topic {}", clientAppId(),
- topicNamePartition,
throwable);
- asyncResponse.resume(new
RestException(throwable));
- }
- messageIds.put(finalI, messageId);
- }));
- } catch (Exception e) {
- log.error("[{}] Failed to terminate topic {}",
clientAppId(), topicNamePartition,
- e);
- throw new RestException(e);
- }
+ }
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+
getTopicNotFoundErrorMessage(topicName.toString())));
+ } else {
+ log.error("[{}] Failed to terminate topic {}",
clientAppId(), topicName, t);
+ asyncResponse.resume(new RestException(t));
}
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
-
getTopicNotFoundErrorMessage(topicName.toString())));
- } else {
- log.error("[{}] Failed to terminate topic
{}", clientAppId(), topicName, t);
- asyncResponse.resume(new RestException(t));
- }
- }
- asyncResponse.resume(messageIds);
- return null;
- });
}
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to
log it.
- if (isNot307And404Exception(ex)) {
- log.error("[{}] Failed to terminate topic {}",
clientAppId(), topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
+ asyncResponse.resume(messageIds);
return null;
- })
- ).exceptionally(ex -> {
+ });
+ }
+ }).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminate topic {}", clientAppId(),
topicName, ex);
@@ -4186,16 +4160,16 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<LongRunningProcessStatus>
internalCompactionStatusAsync(boolean authoritative) {
- return validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.COMPACT))
+ return validateTopicOperationAsync(topicName, TopicOperation.COMPACT)
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenApply(topic -> ((PersistentTopic)
topic).compactionStatus());
}
protected void internalTriggerOffload(AsyncResponse asyncResponse,
boolean authoritative, MessageIdImpl
messageId) {
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.OFFLOAD))
+ validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD)
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
try {
@@ -4221,8 +4195,8 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean
authoritative) {
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.OFFLOAD))
+ validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD)
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
OffloadProcessStatus offloadProcessStatus =
((PersistentTopic) topic).offloadStatus();
@@ -4482,8 +4456,8 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalGetLastMessageId(AsyncResponse asyncResponse,
boolean authoritative) {
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES))
+ validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
@@ -5207,33 +5181,27 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<SchemaCompatibilityStrategy>
internalGetSchemaCompatibilityStrategy(boolean applied) {
- CompletableFuture<Void> future =
validateTopicPolicyOperationAsync(topicName,
- PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
if (applied) {
- return future.thenCompose(__ ->
getSchemaCompatibilityStrategyAsync());
+ return getSchemaCompatibilityStrategyAsync();
}
- return future
- .thenCompose(n ->
getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
+ return getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
if (!op.isPresent()) {
return null;
}
SchemaCompatibilityStrategy strategy =
op.get().getSchemaCompatibilityStrategy();
return SchemaCompatibilityStrategy.isUndefined(strategy) ?
null : strategy;
- }));
+ });
}
protected CompletableFuture<Void>
internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
- return validateTopicPolicyOperationAsync(topicName,
- PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
- PolicyOperation.WRITE)
- .thenCompose((__) -> getTopicPoliciesAsyncWithRetry(topicName)
+ return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
topicPolicies.setSchemaCompatibilityStrategy(
strategy ==
SchemaCompatibilityStrategy.UNDEFINED ? null : strategy);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName,
topicPolicies);
- }));
+ });
}
protected CompletableFuture<Boolean>
internalGetSchemaValidationEnforced(boolean applied) {
@@ -5257,54 +5225,47 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean
applied, boolean isGlobal) {
- return validateTopicPolicyOperationAsync(topicName,
PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
- .thenCompose(__ -> {
- if (!applied) {
- return getTopicPoliciesAsyncWithRetry(topicName,
isGlobal)
- .thenApply(op ->
op.map(TopicPolicies::getEntryFilters).orElse(null));
- }
- if
(!pulsar().getConfiguration().isAllowOverrideEntryFilters()) {
- return CompletableFuture.completedFuture(new
EntryFilters(String.join(",",
-
pulsar().getConfiguration().getEntryFilterNames())));
+ if (!applied) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenApply(op ->
op.map(TopicPolicies::getEntryFilters).orElse(null));
+ }
+ if (!pulsar().getConfiguration().isAllowOverrideEntryFilters()) {
+ return CompletableFuture.completedFuture(new
EntryFilters(String.join(",",
+ pulsar().getConfiguration().getEntryFilterNames())));
+ }
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenApply(op -> op.map(TopicPolicies::getEntryFilters))
+ .thenCompose(policyEntryFilters -> {
+ if (policyEntryFilters.isPresent()) {
+ return
CompletableFuture.completedFuture(policyEntryFilters.get());
}
- return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
- .thenApply(op ->
op.map(TopicPolicies::getEntryFilters))
- .thenCompose(policyEntryFilters -> {
- if (policyEntryFilters.isPresent()) {
- return
CompletableFuture.completedFuture(policyEntryFilters.get());
+ return getNamespacePoliciesAsync(namespaceName)
+ .thenApply(policies -> policies.entryFilters)
+ .thenCompose(nsEntryFilters -> {
+ if (nsEntryFilters != null) {
+ return
CompletableFuture.completedFuture(nsEntryFilters);
}
- return getNamespacePoliciesAsync(namespaceName)
- .thenApply(policies ->
policies.entryFilters)
- .thenCompose(nsEntryFilters -> {
- if (nsEntryFilters != null) {
- return
CompletableFuture.completedFuture(nsEntryFilters);
- }
- return
CompletableFuture.completedFuture(new EntryFilters(String.join(",",
-
pulsar().getConfiguration().getEntryFilterNames())));
- });
+ return CompletableFuture.completedFuture(new
EntryFilters(String.join(",",
+
pulsar().getConfiguration().getEntryFilterNames())));
});
});
}
protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters
entryFilters,
boolean
isGlobal) {
-
- return validateTopicPolicyOperationAsync(topicName,
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
- .thenAccept(__ -> validateEntryFilters(entryFilters))
- .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName,
isGlobal)
+ validateEntryFilters(entryFilters);
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
topicPolicies.setEntryFilters(entryFilters);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName,
topicPolicies);
- }));
+ });
}
protected CompletableFuture<Void> internalRemoveEntryFilters(boolean
isGlobal) {
- return validateTopicPolicyOperationAsync(topicName,
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
- .thenCompose(__ ->
- getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -5312,7 +5273,7 @@ public class PersistentTopicsBase extends AdminResource {
op.get().setEntryFilters(null);
op.get().setIsGlobal(isGlobal);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
op.get());
- }));
+ });
}
protected CompletableFuture<Void> validateShadowTopics(List<String>
shadowTopics) {
@@ -5348,8 +5309,7 @@ public class PersistentTopicsBase extends AdminResource {
return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
"Cannot specify empty shadow topics, please use remove
command instead."));
}
- return validateTopicPolicyOperationAsync(topicName,
PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE)
- .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> validateShadowTopics(shadowTopics))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenCompose(op -> {
@@ -5361,8 +5321,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalDeleteShadowTopics() {
- return validateTopicPolicyOperationAsync(topicName,
PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE)
- .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ return validatePoliciesReadOnlyAccessAsync()
.thenCompose(shadowTopicName ->
getTopicPoliciesAsyncWithRetry(topicName))
.thenCompose(op -> {
TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 90f0208c81c..7e138442ae2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2149,7 +2149,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
@ApiParam(value = "backlog quota policies for the specified
topic") BacklogQuotaImpl backlogQuota) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType,
backlogQuota, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2174,7 +2175,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, null,
isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2237,7 +2239,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "List of replication clusters", required = true)
List<String> clusterIds) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicationClusters(clusterIds))
.thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2260,7 +2263,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveReplicationClusters())
.thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -4405,8 +4409,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
-
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__->
internalGetSchemaCompatibilityStrategy(applied))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -4436,8 +4440,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Strategy used to check the compatibility of new
schema")
SchemaCompatibilityStrategy strategy) {
validateTopicName(tenant, namespace, encodedTopic);
-
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetSchemaCompatibilityStrategy(strategy))
.thenRun(() -> {
log.info(
@@ -4476,8 +4480,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Strategy used to check the compatibility of new
schema")
SchemaCompatibilityStrategy strategy) {
validateTopicName(tenant, namespace, encodedTopic);
-
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ ->
internalSetSchemaCompatibilityStrategy(null))
.thenRun(() -> {
log.info(
@@ -4568,7 +4572,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
+ "broker. For internal use.")
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetEntryFilters(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -4596,7 +4601,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Entry filters
for the specified topic")
EntryFilters entryFilters) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetEntryFilters(entryFilters,
isGlobal))
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -4622,7 +4628,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
+ "call to this broker. For
internal use.")
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveEntryFilters(isGlobal))
.thenRun(() -> {
log.info(
@@ -4655,9 +4662,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
- .thenCompose(__ ->
validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC,
- PolicyOperation.READ))
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC,
PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenAccept(op ->
asyncResponse.resume(op.map(TopicPolicies::getShadowTopics).orElse(null)))
.exceptionally(ex -> {
@@ -4684,7 +4690,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "List of shadow topics", required = true)
List<String> shadowTopics) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetShadowTopic(shadowTopics))
.thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -4710,7 +4717,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC,
PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalDeleteShadowTopics())
.thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index e6ff0ce2bb4..3c0596d531f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -19,48 +19,54 @@
package org.apache.pulsar.broker.admin;
+import com.google.common.collect.Lists;
import io.jsonwebtoken.Jwts;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
+import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.security.MockedPulsarStandalone;
import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
@Test(groups = "broker-admin")
public class TopicAuthZTest extends MockedPulsarStandalone {
@@ -1105,15 +1111,15 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
deleteTopic(topic, false);
}
- @Test(dataProvider = "partitioned", groups = "flaky")
+ @Test
@SneakyThrows
- public void testExpireMessage(boolean partitioned) {
+ public void testExpireMessage() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
- createTopic(topic, partitioned);
+ createTopic(topic, false);
@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
@@ -1153,7 +1159,7 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
}
superUserAdmin.topics().revokePermissions(topic, subject);
}
- deleteTopic(topic, partitioned);
+ deleteTopic(topic, false);
}
@Test
@@ -1373,6 +1379,37 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
};
}
+ @Test
+ @SneakyThrows
+ public void testSchemaCompatibility() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ createTopic(topic, false);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic,
true);
+
+ // test tenant manager
+
tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, true);
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ deleteTopic(topic, false);
+ }
+
@Test(dataProvider = "authFunction")
public void
testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin>
adminConsumer, OperationAuthType topicOpType)
throws Exception {
@@ -1380,6 +1417,7 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
+
@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
@@ -1419,8 +1457,8 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
if (execFlag != null) {
Assert.assertTrue(execFlag.get());
}
- }
+ }
private boolean authActionMatchOperation(OperationAuthType
operationAuthType, AuthAction action) {
switch (operationAuthType) {
@@ -1449,6 +1487,175 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
return false;
}
+ @Test
+ @SneakyThrows
+ public void testGetEntryFilter() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ createTopic(topic, false);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ //
+ superUserAdmin.topicPolicies().getEntryFiltersPerTopic(topic, true);
+
+ // test tenant manager
+ tenantManagerAdmin.topicPolicies().getEntryFiltersPerTopic(topic,
true);
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topicPolicies().getEntryFiltersPerTopic(topic,
false));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().getEntryFiltersPerTopic(topic, false));
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ deleteTopic(topic, false);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testSetEntryFilter() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ createTopic(topic, false);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ //
+ final EntryFilterProvider oldEntryFilterProvider =
getPulsarService().getBrokerService().getEntryFilterProvider();
+ @Cleanup
+ final MockEntryFilterProvider testEntryFilterProvider =
+ new MockEntryFilterProvider(getServiceConfiguration());
+
+ testEntryFilterProvider
+ .setMockEntryFilters(new EntryFilterDefinition(
+ "test",
+ null,
+ EntryFilterTest.class.getName()
+ ));
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
+ "entryFilterProvider", testEntryFilterProvider, true);
+ final EntryFilters entryFilter = new EntryFilters("test");
+ superUserAdmin.topicPolicies().setEntryFiltersPerTopic(topic,
entryFilter);
+
+ // test tenant manager
+ tenantManagerAdmin.topicPolicies().setEntryFiltersPerTopic(topic,
entryFilter);
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topicPolicies().setEntryFiltersPerTopic(topic,
entryFilter));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilter));
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ deleteTopic(topic, false);
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
+ "entryFilterProvider", oldEntryFilterProvider, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testRemoveEntryFilter() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ createTopic(topic, false);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ final EntryFilterProvider oldEntryFilterProvider =
getPulsarService().getBrokerService().getEntryFilterProvider();
+ @Cleanup
+ final MockEntryFilterProvider testEntryFilterProvider =
+ new MockEntryFilterProvider(getServiceConfiguration());
+
+ testEntryFilterProvider
+ .setMockEntryFilters(new EntryFilterDefinition(
+ "test",
+ null,
+ EntryFilterTest.class.getName()
+ ));
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
+ "entryFilterProvider", testEntryFilterProvider, true);
+ final EntryFilters entryFilter = new EntryFilters("test");
+ superUserAdmin.topicPolicies().removeEntryFiltersPerTopic(topic);
+ // test tenant manager
+ tenantManagerAdmin.topicPolicies().removeEntryFiltersPerTopic(topic);
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().removeEntryFiltersPerTopic(topic));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().removeEntryFiltersPerTopic(topic));
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ deleteTopic(topic, false);
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
+ "entryFilterProvider", oldEntryFilterProvider, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testShadowTopic() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ createTopic(topic, false);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ String shadowTopic = topic + "-shadow-topic";
+ superUserAdmin.topics().createShadowTopic(shadowTopic, topic);
+ superUserAdmin.topics().setShadowTopics(topic,
Lists.newArrayList(shadowTopic));
+ superUserAdmin.topics().getShadowTopics(topic);
+ superUserAdmin.topics().removeShadowTopics(topic);
+
+
+ // test tenant manager
+ tenantManagerAdmin.topics().setShadowTopics(topic,
Lists.newArrayList(shadowTopic));
+ tenantManagerAdmin.topics().getShadowTopics(topic);
+ tenantManagerAdmin.topics().removeShadowTopics(topic);
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().setShadowTopics(topic,
Lists.newArrayList(shadowTopic)));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getShadowTopics(topic));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().setShadowTopics(topic,
Lists.newArrayList(shadowTopic)));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getShadowTopics(topic));
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ deleteTopic(topic, false);
+ }
+
private void createTopic(String topic, boolean partitioned) throws
Exception {
if (partitioned) {
superUserAdmin.topics().createPartitionedTopic(topic, 2);