This is an automated email from the ASF dual-hosted git repository.
BewareMyPower pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8652efa4d60 [feat][broker] PIP-469: Legacy-aware topic policies
backend routing and metadata-store topic policies (#25707)
8652efa4d60 is described below
commit 8652efa4d60b791a5f1ee4e52f7ffda6ebbbb256
Author: Yunze Xu <[email protected]>
AuthorDate: Mon May 18 19:16:47 2026 +0800
[feat][broker] PIP-469: Legacy-aware topic policies backend routing and
metadata-store topic policies (#25707)
---
pip/pip-469.md | 37 ++-
.../apache/pulsar/broker/ServiceConfiguration.java | 12 +-
.../org/apache/pulsar/broker/PulsarService.java | 11 +-
.../pulsar/broker/service/AbstractTopic.java | 2 +-
.../service/LegacyAwareTopicPoliciesService.java | 144 +++++++++++
.../service/MetadataStoreTopicPoliciesService.java | 280 +++++++++++++++++++++
.../SystemTopicBasedTopicPoliciesService.java | 2 +-
.../broker/service/TopicPoliciesService.java | 15 +-
.../broker/service/persistent/PersistentTopic.java | 8 +-
.../admin/MetadataStoreTopicPoliciesTest.java | 72 ++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 114 ++++++---
.../LegacyAwareTopicPoliciesServiceTest.java | 190 ++++++++++++++
.../SystemTopicBasedTopicPoliciesServiceTest.java | 4 +-
.../broker/service/TopicPolicyTestUtils.java | 7 +
14 files changed, 852 insertions(+), 46 deletions(-)
diff --git a/pip/pip-469.md b/pip/pip-469.md
index 69036381f0f..4734adc127d 100644
--- a/pip/pip-469.md
+++ b/pip/pip-469.md
@@ -105,8 +105,15 @@ Broker startup validates both backends:
- `SystemTopicBasedTopicPoliciesService` must be instantiable.
- The configured `topicPoliciesServiceClassName` must be instantiable.
-If either backend cannot be instantiated or started, broker startup fails.
There is no per-request fallback from one
-backend to another.
+`LegacyAwareTopicPoliciesService#start` starts only the configured backend. It
intentionally does not call
+`SystemTopicBasedTopicPoliciesService#start`, because that start path
registers a namespace-bundle ownership listener
+whose only purpose is to eagerly create a reader on
`<namespace>/__change_events` when a namespace bundle is loaded.
+Under legacy-aware routing, that eager optimization would be counterproductive
because it can create readers for
+namespaces that do not have topic policies in `__change_events`. For legacy
namespaces, the system-topic reader and
+policy cache are initialized lazily by the routed system-topic backend
operations.
+
+If either backend cannot be instantiated, or if the configured backend cannot
be started, broker startup fails. There is
+no per-request fallback from one backend to another.
### Namespace-scoped service routing
@@ -118,6 +125,10 @@ backend to another.
the system-topic backend when the system topic exists.
- Routing the same operations to the configured backend when the system topic
does not exist.
+Listener registration is routed through
`TopicPoliciesService#registerListenerAsync`. This lets the wrapper resolve the
+namespace backend before registering the listener, and the listener is
registered only on the selected backend instead
+of being registered on both backends.
+
The system-topic existence check can be cached per namespace in memory, but
the routing rule is defined by actual topic
existence rather than by new namespace metadata.
@@ -137,9 +148,13 @@ meaning the system-topic-backed topic-policies state is
gone.
- Topic names are normalized to the partitioned topic name, so all partitions
share the same topic-policies record.
- Global policies are stored in the configuration metadata store path:
- `/admin/topic-policies/{tenant}/{namespace}/{domain}/{encodedTopic}`.
+ `/admin/topic-policies/global/{tenant}/{namespace}/{domain}/{encodedTopic}`.
- Local policies are stored in the local metadata store path:
-
`/admin/local-policies/topic-policies/{tenant}/{namespace}/{domain}/{encodedTopic}`.
+ `/admin/topic-policies/local/{tenant}/{namespace}/{domain}/{encodedTopic}`.
+
+To avoid possible conflicts like the listener registered on the
`/admin/local-policies` path from
+`BrokerService#handleMetadataChanges`, these two paths share the same root
path `/admin/topic-policies`, which is not
+used by any other component.
Each node stores a serialized `TopicPolicies` document. The backend writes and
reads the two scopes independently:
@@ -159,6 +174,11 @@ managed-ledger metadata updates.
### Listener behavior
+`TopicPoliciesService` adds `registerListenerAsync(TopicName,
TopicPolicyListener)` for listener registration. The
+existing synchronous `registerListener(TopicName, TopicPolicyListener)` method
is retained as a deprecated compatibility
+hook for existing custom implementations, and the default async method
delegates to it. Implementations that need async
+routing or initialization, such as `LegacyAwareTopicPoliciesService`, override
`registerListenerAsync` directly.
+
The backend registers watchers on both metadata stores:
- A change on the local path re-reads the local node and notifies listeners
with the latest local `TopicPolicies` or
@@ -173,6 +193,11 @@ append-only replay log; it relies on metadata-store
notifications and read-after
### Public API
+The `TopicPoliciesService` extension point gains a default
+`CompletableFuture<Boolean> registerListenerAsync(TopicName,
TopicPolicyListener)` method. Existing implementations
+remain compatible because `registerListener(TopicName, TopicPolicyListener)`
is retained and used by the default async
+implementation.
+
No new namespace policy field is introduced.
No new namespace admin REST endpoint or Java admin client method is introduced.
@@ -221,6 +246,10 @@ This upgrade rule is intentionally conservative:
This means some namespaces with an empty but already-created `__change_events`
topic may continue using the
system-topic backend. That is acceptable because it avoids missing legacy
state.
+Existing custom `TopicPoliciesService` implementations that only implement the
synchronous `registerListener` method
+continue to work through the default `registerListenerAsync` bridge.
Implementations can override
+`registerListenerAsync` when registration itself needs asynchronous backend
resolution or initialization.
+
## Downgrade / Rollback
Rolling back to a broker version that does not understand legacy-aware routing
returns topic-policies backend
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4c3651bf90b..5707c05af1b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1750,8 +1750,16 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
- doc = "The class name of the topic policies service. The default
config only takes affect when the "
- + "systemTopicEnable config is true"
+ doc = """
+ The class name of the topic policies service. There are 2
built-in implementations:
+ 1.
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService"
(default)
+ It stores a topic's policies in the `__change_events`
topic. If `systemTopicEnabled` is false,
+ the topic policies will just be disabled
+ 2.
"org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService"
+ It stores a topic's policies in the metadata store. If
`systemTopicEnabled` is true and the
+ topic's namespace has a `__change_events` topic, the
policies will still be stored in the
+ `__change_events` topic for backward compatibility.
+ """
)
private String topicPoliciesServiceClassName =
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService";
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4fe6196c05c..b200284797f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -107,6 +107,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.HealthChecker;
+import org.apache.pulsar.broker.service.LegacyAwareTopicPoliciesService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
@@ -2293,8 +2294,16 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return TopicPoliciesService.DISABLED;
}
}
- return (TopicPoliciesService) Reflections.createInstance(className,
+ final var configuredService = (TopicPoliciesService)
Reflections.createInstance(className,
Thread.currentThread().getContextClassLoader());
+ if (!config.isSystemTopicEnabled()) {
+ log.info()
+ .attr("className", className)
+ .log("System topic is disabled, using configured topic
policies service without legacy routing");
+ return configuredService;
+ }
+ return new LegacyAwareTopicPoliciesService(this, new
SystemTopicBasedTopicPoliciesService(this),
+ configuredService);
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4d29c751f03..3ad9eb43810 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -559,7 +559,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
protected void registerTopicPolicyListener() {
brokerService.getPulsar().getTopicPoliciesService()
- .registerListener(TopicName.getPartitionedTopicName(topic),
this);
+
.registerListenerAsync(TopicName.getPartitionedTopicName(topic), this);
}
protected void unregisterTopicPolicyListener() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java
new file mode 100644
index 00000000000..20f7b207991
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.jspecify.annotations.NonNull;
+
+/**
+ * Routes topic policy operations to the legacy system-topic backend when a
namespace already has
+ * a topic-policy {@code __change_events} system topic, and otherwise to the
configured backend.
+ */
+@CustomLog
+public class LegacyAwareTopicPoliciesService implements TopicPoliciesService {
+
+ private final AsyncLoadingCache<NamespaceName, Boolean> isLegacyNamespace;
+ @VisibleForTesting
+ final SystemTopicBasedTopicPoliciesService systemTopicService;
+ private final TopicPoliciesService configuredService;
+
+ public LegacyAwareTopicPoliciesService(PulsarService pulsar,
+
SystemTopicBasedTopicPoliciesService systemTopicService,
+ TopicPoliciesService
configuredService) {
+ // Generally, we only need to check if the __change_events topic
exists once because the __change_events topic
+ // should only be created by broker before the upgrade, where
`SystemTopicBasedTopicPoliciesService` is
+ // configured as the topic policies service.
+ this.isLegacyNamespace =
Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(1))
+ .buildAsync(new AsyncCacheLoader<>() {
+ @NonNull
+ @Override
+ public CompletableFuture<? extends Boolean>
asyncLoad(NamespaceName key,
+
@NonNull Executor executor) {
+ return
NamespaceEventsSystemTopicFactory.checkSystemTopicExists(key,
EventType.TOPIC_POLICY,
+ pulsar);
+ }
+ });
+ this.systemTopicService = systemTopicService;
+ this.configuredService = configuredService;
+ if (configuredService instanceof SystemTopicBasedTopicPoliciesService)
{
+ throw new IllegalArgumentException(
+ "configuredService should not be an instance of
SystemTopicBasedTopicPoliciesService");
+ }
+ }
+
+ @Override
+ public void start(PulsarService pulsarService) {
+ // We should not call `systemTopicService.start()`, which just
registers a namespace bundle listener to create
+ // a reader on `<namespace>/__change_events` when the namespace's
bundle is loaded firstly. It's just an
+ // optimization to create the reader before loading any topic.
However, it could create a reader on a namespace
+ // that does not even have the __change_events topic.
+ configuredService.start(pulsarService);
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ configuredService.close();
+ } finally {
+ systemTopicService.close();
+ }
+ }
+
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(TopicName topicName, GetType type) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service ->
service.getTopicPoliciesAsync(topicName, type));
+ }
+
+ @Override
+ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName
topicName, boolean isGlobalPolicy,
+ boolean
skipUpdateWhenTopicPolicyDoesntExist,
+
Consumer<TopicPolicies> policyUpdater) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service ->
service.updateTopicPoliciesAsync(topicName, isGlobalPolicy,
+ skipUpdateWhenTopicPolicyDoesntExist, policyUpdater));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName
topicName) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service ->
service.deleteTopicPoliciesAsync(topicName));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName
topicName,
+ boolean
keepGlobalPoliciesAfterDeleting) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service ->
service.deleteTopicPoliciesAsync(topicName,
+ keepGlobalPoliciesAfterDeleting));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> registerListenerAsync(TopicName
topicName, TopicPolicyListener listener) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service ->
service.registerListenerAsync(topicName, listener));
+ }
+
+ @Override
+ public boolean registerListener(TopicName topicName, TopicPolicyListener
listener) {
+ throw new RuntimeException("should not be called");
+ }
+
+ @Override
+ public void unregisterListener(TopicName topicName, TopicPolicyListener
listener) {
+ configuredService.unregisterListener(topicName, listener);
+ systemTopicService.unregisterListener(topicName, listener);
+ }
+
+ @VisibleForTesting
+ CompletableFuture<TopicPoliciesService> resolveService(NamespaceName
namespace) {
+ return isLegacyNamespace.get(namespace)
+ .thenApply(isLegacy -> isLegacy ? systemTopicService :
configuredService);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java
new file mode 100644
index 00000000000..56319a44ac3
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Topic policies service backed by Pulsar metadata stores.
+ */
+@CustomLog
+public class MetadataStoreTopicPoliciesService implements TopicPoliciesService
{
+
+ public static final String GLOBAL_POLICIES_ROOT =
"/admin/topic-policies/global";
+ public static final String LOCAL_POLICIES_ROOT =
"/admin/topic-policies/local";
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Map<TopicName, List<TopicPolicyListener>> listeners = new
ConcurrentHashMap<>();
+ private MetadataCache<TopicPolicies> localPoliciesCache;
+ private MetadataCache<TopicPolicies> globalPoliciesCache;
+
+ @Override
+ public void start(PulsarService pulsar) {
+ MetadataStore localStore = pulsar.getLocalMetadataStore();
+ MetadataStore configurationStore =
pulsar.getConfigurationMetadataStore();
+ this.localPoliciesCache =
localStore.getMetadataCache(TopicPolicies.class);
+ this.globalPoliciesCache =
configurationStore.getMetadataCache(TopicPolicies.class);
+ localStore.registerListener(notification ->
handleNotification(notification, false));
+ configurationStore.registerListener(notification ->
handleNotification(notification, true));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName
topicName) {
+ return deleteTopicPoliciesAsync(topicName, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName
topicName,
+ boolean
keepGlobalPoliciesAfterDeleting) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ if
(NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject()))
{
+ return CompletableFuture.completedFuture(null);
+ }
+ if (closed.get()) {
+ return CompletableFuture.failedFuture(new
BrokerServiceException(getClass().getName() + " is closed."));
+ }
+ CompletableFuture<Void> deleteLocal =
+ deleteIfExists(localPoliciesCache,
pathFor(partitionedTopicName, false));
+ if (keepGlobalPoliciesAfterDeleting) {
+ return deleteLocal;
+ }
+ CompletableFuture<Void> deleteGlobal =
+ deleteIfExists(globalPoliciesCache,
pathFor(partitionedTopicName, true));
+ return CompletableFuture.allOf(deleteLocal, deleteGlobal);
+ }
+
+ @Override
+ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName
topicName, boolean isGlobalPolicy,
+ boolean
skipUpdateWhenTopicPolicyDoesntExist,
+
Consumer<TopicPolicies> policyUpdater) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ if
(NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject()))
{
+ return CompletableFuture.failedFuture(new
BrokerServiceException.NotAllowedException(
+ "Not allowed to update topic policy for the heartbeat
topic"));
+ }
+ if (closed.get()) {
+ return CompletableFuture.failedFuture(new
BrokerServiceException(getClass().getName() + " is closed."));
+ }
+ MetadataCache<TopicPolicies> cache = cache(isGlobalPolicy);
+ String path = pathFor(partitionedTopicName, isGlobalPolicy);
+ CompletableFuture<TopicPolicies> updateFuture;
+ if (skipUpdateWhenTopicPolicyDoesntExist) {
+ updateFuture = cache.readModifyUpdate(path,
+ current -> updatePolicies(Optional.of(current),
isGlobalPolicy, policyUpdater));
+ } else {
+ updateFuture = cache.readModifyUpdateOrCreate(path,
+ current -> updatePolicies(current, isGlobalPolicy,
policyUpdater));
+ }
+ return updateFuture.thenAccept(__ -> { }).exceptionally(error -> {
+ if (skipUpdateWhenTopicPolicyDoesntExist
+ && FutureUtil.unwrapCompletionException(error) instanceof
NotFoundException) {
+ return null;
+ }
+ throw FutureUtil.wrapToCompletionException(error);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(TopicName topicName, GetType type) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ if
(NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject()))
{
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ if (closed.get()) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ boolean global = type == GetType.GLOBAL_ONLY;
+ return cache(global).get(pathFor(partitionedTopicName, global))
+ .thenApply(policies -> policies.map(policy ->
cloneWithScope(policy, global)));
+ }
+
+ @Override
+ public boolean registerListener(TopicName topicName, TopicPolicyListener
listener) {
+ listeners.compute(normalizeTopicName(topicName), (__, topicListeners)
-> {
+ if (topicListeners == null) {
+ topicListeners = new CopyOnWriteArrayList<>();
+ }
+ topicListeners.add(listener);
+ return topicListeners;
+ });
+ return true;
+ }
+
+ @Override
+ public void unregisterListener(TopicName topicName, TopicPolicyListener
listener) {
+ listeners.computeIfPresent(normalizeTopicName(topicName), (__,
topicListeners) -> {
+ topicListeners.remove(listener);
+ return topicListeners.isEmpty() ? null : topicListeners;
+ });
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ listeners.clear();
+ if (localPoliciesCache != null) {
+ localPoliciesCache.invalidateAll();
+ }
+ if (globalPoliciesCache != null) {
+ globalPoliciesCache.invalidateAll();
+ }
+ }
+ }
+
+ private MetadataCache<TopicPolicies> cache(boolean isGlobalPolicy) {
+ return isGlobalPolicy ? globalPoliciesCache : localPoliciesCache;
+ }
+
+ private CompletableFuture<Void>
deleteIfExists(MetadataCache<TopicPolicies> cache, String path) {
+ return cache.delete(path).handle((__, error) -> {
+ cache.invalidate(path);
+ if (error == null || FutureUtil.unwrapCompletionException(error)
instanceof NotFoundException) {
+ return null;
+ }
+ throw FutureUtil.wrapToCompletionException(error);
+ });
+ }
+
+ private static TopicPolicies updatePolicies(Optional<TopicPolicies>
currentPolicies,
+ boolean isGlobalPolicy,
+ Consumer<TopicPolicies>
policyUpdater) {
+ TopicPolicies policies =
currentPolicies.map(TopicPolicies::clone).orElseGet(TopicPolicies::new);
+ policies.setIsGlobal(isGlobalPolicy);
+ policyUpdater.accept(policies);
+ return policies;
+ }
+
+ private void handleNotification(Notification notification, boolean
isGlobalPolicy) {
+ if (closed.get()
+ || (notification.getType() != NotificationType.Created
+ && notification.getType() != NotificationType.Modified
+ && notification.getType() != NotificationType.Deleted)) {
+ return;
+ }
+ String path = notification.getPath();
+ String root = isGlobalPolicy ? GLOBAL_POLICIES_ROOT :
LOCAL_POLICIES_ROOT;
+ Optional<TopicName> topicName = topicNameFromPath(root, path);
+ if (topicName.isEmpty()) {
+ return;
+ }
+ MetadataCache<TopicPolicies> cache = cache(isGlobalPolicy);
+ cache.invalidate(path);
+ if (notification.getType() == NotificationType.Deleted) {
+ notifyListeners(topicName.get(), null);
+ return;
+ }
+ cache.get(path).whenComplete((policies, error) -> {
+ if (error != null) {
+ log.warn()
+ .attr("path", path)
+ .exception(error)
+ .log("Failed to refresh topic policies after metadata
notification");
+ return;
+ }
+ notifyListeners(topicName.get(),
+ policies.map(policy -> cloneWithScope(policy,
isGlobalPolicy)).orElse(null));
+ });
+ }
+
+ private void notifyListeners(TopicName topicName, @Nullable TopicPolicies
policies) {
+ List<TopicPolicyListener> topicListeners = listeners.get(topicName);
+ if (topicListeners == null) {
+ return;
+ }
+ for (TopicPolicyListener listener : topicListeners) {
+ try {
+ listener.onUpdate(policies == null ? null : policies.clone());
+ } catch (Throwable error) {
+ log.error().attr("topic",
topicName).exception(error).log("Call topic policy listener error");
+ }
+ }
+ }
+
+ private static TopicName normalizeTopicName(TopicName topicName) {
+ return TopicName.get(topicName.getPartitionedTopicName());
+ }
+
+ private static TopicPolicies cloneWithScope(TopicPolicies policies,
boolean isGlobalPolicy) {
+ TopicPolicies cloned = policies.clone();
+ cloned.setIsGlobal(isGlobalPolicy);
+ return cloned;
+ }
+
+ @VisibleForTesting
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesDirectFromStore(TopicName topicName,
+
boolean isGlobal) {
+ String path = pathFor(topicName, isGlobal);
+ MetadataCache<TopicPolicies> c = cache(isGlobal);
+ c.invalidate(path);
+ return c.get(path).thenApply(opt -> opt.map(p -> cloneWithScope(p,
isGlobal)));
+ }
+
+ @VisibleForTesting
+ static String pathFor(TopicName topicName, boolean isGlobalPolicy) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ return (isGlobalPolicy ? GLOBAL_POLICIES_ROOT : LOCAL_POLICIES_ROOT)
+ + "/" + partitionedTopicName.getTenant()
+ + "/" + partitionedTopicName.getNamespacePortion()
+ + "/" + partitionedTopicName.getDomain()
+ + "/" + partitionedTopicName.getEncodedLocalName();
+ }
+
+ @VisibleForTesting
+ private static Optional<TopicName> topicNameFromPath(String root, String
path) {
+ if (!path.startsWith(root + "/")) {
+ return Optional.empty();
+ }
+ String[] parts = path.substring(root.length() + 1).split("/", 4);
+ if (parts.length != 4) {
+ return Optional.empty();
+ }
+ return Optional.of(TopicName.get(parts[2], parts[0], parts[1],
Codec.decode(parts[3])));
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 48653883d1a..a8f37d0c389 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -667,7 +667,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return systemTopicClient.newReaderAsync();
}
- private void removeOwnedNamespaceBundleAsync(NamespaceBundle
namespaceBundle) {
+ void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.isHeartbeatNamespace(namespace)) {
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 803a18da72d..5b5fe157230 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -96,6 +96,15 @@ public interface TopicPoliciesService extends AutoCloseable {
default void close() throws Exception {
}
+
+ /**
+ * @implNote This method is never called unless by the default
implementation of
+ * {@link TopicPoliciesService#registerListenerAsync(TopicName,
TopicPolicyListener)}, which is actually called
+ * internally. This method is only retained for backward compatibility on
custom implementations.
+ */
+ @Deprecated
+ boolean registerListener(TopicName topicName, TopicPolicyListener
listener);
+
/**
* Registers a listener for topic policies updates.
*
@@ -106,10 +115,10 @@ public interface TopicPoliciesService extends
AutoCloseable {
* guaranteed to be received by the listener.
* In summary, the listener is guaranteed to receive only the latest value.
* </p>
- *
- * @return true if the listener is registered successfully
*/
- boolean registerListener(TopicName topicName, TopicPolicyListener
listener);
+ default CompletableFuture<Boolean> registerListenerAsync(TopicName
topicName, TopicPolicyListener listener) {
+ return CompletableFuture.completedFuture(registerListener(topicName,
listener));
+ }
/**
* Unregister the topic policies listener.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a4ef9a564fc..c9bcad341fc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4904,7 +4904,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
protected CompletableFuture<Void> initTopicPolicy() {
final var topicPoliciesService =
brokerService.pulsar().getTopicPoliciesService();
final var partitionedTopicName =
TopicName.getPartitionedTopicName(topic);
- if (topicPoliciesService.registerListener(partitionedTopicName, this))
{
+ return
topicPoliciesService.registerListenerAsync(partitionedTopicName,
this).thenCompose(registered -> {
+ if (!registered) {
+ return CompletableFuture.completedFuture(null);
+ }
if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}
@@ -4916,8 +4919,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
TopicPoliciesService.GetType.LOCAL_ONLY))
.thenAcceptAsync(optionalPolicies ->
optionalPolicies.ifPresent(this::onUpdate),
brokerService.getTopicOrderedExecutor());
- }
- return CompletableFuture.completedFuture(null);
+ });
}
@VisibleForTesting
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java
new file mode 100644
index 00000000000..e7fefa16497
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class MetadataStoreTopicPoliciesTest extends TopicPoliciesTest {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+
conf.setTopicPoliciesServiceClassName(MetadataStoreTopicPoliciesService.class.getName());
+ super.setup();
+ }
+
+ @Override
+ protected void clearTopicPoliciesCache() {
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws
Exception {
+ // This test is specific to SystemTopicBasedTopicPoliciesService (uses
getPoliciesCacheInit).
+ // Not applicable to MetadataStoreTopicPoliciesService.
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testSystemTopicShouldBeCompacted() throws Exception {
+ // Relies on __change_events system topic, which does not exist with
MetadataStoreTopicPoliciesService.
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testPoliciesCanBeDeletedWithTopic() throws Exception {
+ // Directly accesses __change_events PersistentTopic for compaction.
+ // Not applicable to MetadataStoreTopicPoliciesService.
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testProduceChangesWithEncryptionRequired() throws Exception {
+ // Checks __change_events LAC, which does not exist with
MetadataStoreTopicPoliciesService.
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testTopicPoliciesAfterCompaction(String reloadPolicyType)
throws Exception {
+ // The "Recreate_Service" variant creates a new
SystemTopicBasedTopicPoliciesService,
+ // which is not applicable to MetadataStoreTopicPoliciesService.
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index d94aa51b73b..5ba979534ef 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -119,7 +119,9 @@ import org.glassfish.jersey.client.JerseyClient;
import org.glassfish.jersey.client.JerseyClientBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -144,10 +146,11 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
private final int testTopicPartitions = 2;
- @BeforeMethod
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
this.conf.setDefaultNumberOfNamespaceBundles(1);
+ this.conf.setForceDeleteNamespaceAllowed(true);
super.internalSetup();
admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
@@ -156,17 +159,48 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().createNamespace(testTenant + "/" + testNamespace,
Set.of("test"));
admin.namespaces().createNamespace(myNamespaceV1);
admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
- Producer<?> producer =
pulsarClient.newProducer().topic(testTopic).create();
- producer.close();
- waitForZooKeeperWatchers();
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}
+ @BeforeMethod
+ void setupTestTopic() throws Exception {
+ // Recreate namespace to clear any policies set by previous tests
+ try {
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ } catch (PulsarAdminException.NotFoundException e) {
+ // topic may already be deleted
+ }
+ try {
+ admin.namespaces().deleteNamespace(myNamespace, true);
+ } catch (PulsarAdminException.NotFoundException e) {
+ // namespace may already be deleted
+ }
+ try {
+ admin.namespaces().deleteNamespace(myNamespaceV1, true);
+ } catch (PulsarAdminException.NotFoundException e) {
+ // namespace may already be deleted
+ }
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace,
Set.of("test"));
+ admin.namespaces().createNamespace(myNamespaceV1);
+ admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
+ // Acquire namespace bundle ownership so tests that call
getOrCreateTopic() directly succeed.
+ // Without this, services that don't create a __change_events reader
(e.g. MetadataStoreTopicPoliciesService)
+ // leave the bundle unowned after namespace recreation and the first
broker-side topic load fails.
+ admin.lookups().lookupTopic(testTopic + "-partition-0");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ void afterMethodCleanup() throws Exception{
+
admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages",
"0");
+
admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes",
"0");
+ clearTopicPoliciesCache();
+ }
+
@Test
public void updatePropertiesForAutoCreatedTopicTest() throws Exception {
TopicName topicName = TopicName.get(
@@ -519,8 +553,8 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test(dataProvider = "clientRequestType")
public void testPriorityOfGlobalPolicies(String clientRequestType) throws
Exception {
- final SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+ final TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
final JerseyClient httpClient = JerseyClientBuilder.createClient();
// create topic and load it up.
final String namespace = myNamespace;
@@ -600,8 +634,8 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test(dataProvider = "clientRequestType")
public void testPriorityOfGlobalPolicies2(String clientRequestType) throws
Exception {
- final SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+ final TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
final JerseyClient httpClient = JerseyClientBuilder.createClient();
// create topic and load it up.
final String namespace = myNamespace;
@@ -687,8 +721,8 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
final TopicName topicName = TopicName.get(topic);
admin.topics().createNonPartitionedTopic(topic);
pulsarClient.newProducer().topic(topic).create().close();
- final SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+ final TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
// Set non-global policy of the limitation of max consumers.
// Set global policy of the limitation of max producers.
@@ -729,8 +763,8 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
final TopicName topicName = TopicName.get(topic);
admin.topics().createNonPartitionedTopic(topic);
pulsarClient.newProducer().topic(topic).create().close();
- final SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+ final TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
// Set non-global policy of the limitation of max consumers.
// Set global policy of the persistence policies.
@@ -2756,10 +2790,8 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testPublishRateInDifferentLevelPolicy() throws Exception {
- cleanup();
- conf.setMaxPublishRatePerTopicInMessages(5);
- conf.setMaxPublishRatePerTopicInBytes(50L);
- setup();
+
admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages",
"5");
+
admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes",
"50");
final String topicName = "persistent://" + myNamespace + "/test-" +
UUID.randomUUID();
pulsarClient.newProducer().topic(topicName).create().close();
@@ -3050,9 +3082,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testMaxUnackedMessagesOnSubscriptionPriority() throws
Exception {
- cleanup();
- conf.setMaxUnackedMessagesPerSubscription(30);
- setup();
+ restartBroker(conf -> conf.setMaxUnackedMessagesPerSubscription(30));
final String topic = "persistent://" + myNamespace + "/test-" +
UUID.randomUUID();
// init cache
@Cleanup
@@ -3115,6 +3145,9 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
&&
admin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic) == null);
messages = getMsgReceived(consumer1, Integer.MAX_VALUE);
assertEquals(messages.size(), defaultMaxUnackedMsgOnBroker);
+
+ // restore default config
+ restartBroker(conf -> conf.setMaxUnackedMessagesPerSubscription(4 *
50000));
}
private void produceMsg(Producer<byte[]> producer, int msgNum) throws
Exception{
@@ -3299,14 +3332,16 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 30000)
public void testAutoCreationDisabled() throws Exception {
- cleanup();
- conf.setAllowAutoTopicCreation(false);
- setup();
+ admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation",
"false");
+
final String topic = testTopic + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
pulsarClient.newProducer().topic(topic).create().close();
//should not fail
assertNull(admin.topicPolicies().getMessageTTL(topic));
+
+ // restore default
+ admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation",
"true");
}
@SuppressWarnings("deprecation")
@@ -3431,6 +3466,12 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
pulsarClient.newConsumer().topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("test")
.subscribe().close();
+
+ // restore dynamic broker config and conf object
+ pulsar.getConfiguration().setSubscriptionTypesEnabled(
+ Set.of("Exclusive", "Shared", "Failover", "Key_Shared"));
+ admin.brokers().updateDynamicConfiguration("subscriptionTypesEnabled",
+ "Exclusive,Shared,Failover,Key_Shared");
}
@Test(timeOut = 20000)
@@ -3765,7 +3806,8 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
@Test
- public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
+ public void testDoNotCreateSystemTopicForHeartbeatNamespace() throws
Exception {
+ initEventsTopicAndPartitions();
assertTrue(pulsar.getBrokerService().getTopics().size() > 0);
pulsar.getBrokerService().getTopics().forEach((k, v) -> {
TopicName topicName = TopicName.get(k);
@@ -3826,8 +3868,13 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
private void triggerAndWaitNewTopicCompaction(String topicName) throws
Exception {
- PersistentTopic tp =
- (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ Optional<Topic> topicOpt =
+ pulsar.getBrokerService().getTopic(topicName, false).join();
+ if (topicOpt.isEmpty()) {
+ // Topic doesn't exist (e.g., when not using system-topic-based
policies service), nothing to compact.
+ return;
+ }
+ PersistentTopic tp = (PersistentTopic) topicOpt.get();
// Wait for the old task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask =
WhiteboxImpl.getInternalState(tp, "currentCompaction");
@@ -3846,7 +3893,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
* It is not a thread safety method, something will go to a wrong pointer
if there is a task is trying to load a
* topic policies.
*/
- private void clearTopicPoliciesCache() {
+ protected void clearTopicPoliciesCache() {
TopicPoliciesService topicPoliciesService =
pulsar.getTopicPoliciesService();
if (topicPoliciesService instanceof
TopicPoliciesService.TopicPoliciesServiceDisabled) {
return;
@@ -4076,8 +4123,8 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
.isNull());
admin.topicPolicies(true).setRetention(topic, new RetentionPolicies(1,
2));
- SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+ TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
// check global topic policies can be added correctly.
Awaitility.await().untilAsserted(() -> assertNotNull(
@@ -4121,6 +4168,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testMaxMessageSizeWithChunking() throws Exception {
+ final var maxMessageSize = this.conf.getMaxMessageSize();
this.conf.setMaxMessageSize(1000);
@Cleanup
@@ -4149,6 +4197,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
// chunk message send success
producer.send(new byte[2000]);
+ this.conf.setMaxMessageSize(maxMessageSize);
}
@Test(timeOut = 30000)
@@ -4202,6 +4251,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testProduceChangesWithEncryptionRequired() throws Exception {
+ initEventsTopicAndPartitions();
final String beforeLac =
admin.topics().getInternalStats(topicPolicyEventsTopic).lastConfirmedEntry;
admin.namespaces().setEncryptionRequiredStatus(myNamespace, true);
// just an update to trigger writes on __change_events
@@ -4657,4 +4707,10 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long)
(1024 * 1024 * 10L),
"Should inherit offload threshold from legacy namespace
policy");
}
+
+ private void initEventsTopicAndPartitions() throws Exception {
+ try (Producer<?> producer =
pulsarClient.newProducer().topic(testTopic).create()) {
+ // No-op. Creating the producer initializes the events topic and
partitions.
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java
new file mode 100644
index 00000000000..47a7de0528d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test order: testUpgrade() -> other tests (with
MetadataStoreTopicPoliciesService configured) -> testDowngrade().
+ */
+@Test(groups = "broker")
+public class LegacyAwareTopicPoliciesServiceTest extends
MockedPulsarServiceBaseTest {
+
+ private static final String metaNamespace = "public/meta-ns";
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.setupDefaultTenantAndNamespace();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(priority = -1)
+ public void testUpgrade() throws Exception {
+ final var topic = "test-upgrade";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topicPolicies().setCompactionThreshold(topic, 100);
+ waitUntilAssert(() ->
assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 100));
+
+ restartBroker(conf -> {
+ conf.setSystemTopicEnabled(false);
+
conf.setTopicPoliciesServiceClassName(MetadataStoreTopicPoliciesService.class.getName());
+ });
+ // The policies will be lost because when system topic is disabled, it
will not try to read policies from the
+ // __change_events topic
+ assertNull(admin.topicPolicies().getCompactionThreshold(topic));
+
+ restartBroker(conf -> conf.setSystemTopicEnabled(true));
+ // The default namespace still read policies from the __change_events
topic if it exists
+ assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 100);
+
assertFalse(pulsar.getLocalMetadataStore().exists(MetadataStoreTopicPoliciesService.LOCAL_POLICIES_ROOT).get());
+
+ // The global policies are still stored in the __change_events topic
+ admin.topicPolicies(true).setCompactionThreshold(topic, 200);
+ waitUntilAssert(() ->
assertEquals(admin.topicPolicies(true).getCompactionThreshold(topic), 200));
+ assertFalse(pulsar.getConfigurationMetadataStore()
+
.exists(MetadataStoreTopicPoliciesService.GLOBAL_POLICIES_ROOT).get());
+
+ admin.topicPolicies().deleteTopicPolicies(topic);
+ waitUntilAssert(() ->
assertNull(admin.topicPolicies().getCompactionThreshold(topic)));
+
+ admin.namespaces().createNamespace(metaNamespace);
+ }
+
+ @Test(priority = 1)
+ public void testDowngrade() throws Exception {
+ final var topic1 = "downgrade"; // in default namespace
+ admin.topics().createNonPartitionedTopic(topic1);
+ admin.topicPolicies().setCompactionThreshold(topic1, 1);
+ waitUntilAssert(() ->
assertEquals(admin.topicPolicies().getCompactionThreshold(topic1), 1));
+
+ final var topic2 = metaNamespace + "/downgrade";
+ admin.topics().createNonPartitionedTopic(topic2);
+ admin.topicPolicies().setCompactionThreshold(topic2, 2);
+ waitUntilAssert(() ->
assertEquals(admin.topicPolicies().getCompactionThreshold(topic2), 2));
+
+ restartBroker(conf ->
+
conf.setTopicPoliciesServiceClassName(SystemTopicBasedTopicPoliciesService.class.getName()));
+ assertEquals(admin.topicPolicies().getCompactionThreshold(topic1), 1);
+ // The policies will be lost because they are not stored in the
__change_events topic
+ assertNull(admin.topicPolicies().getCompactionThreshold(topic2));
+ }
+
+ @DataProvider
+ public Object[][] namespaces() {
+ return new Object[][] {
+ { "public/default" },
+ { metaNamespace }
+ };
+ }
+
+ @Test(dataProvider = "namespaces")
+ public void testPoliciesOperations(String namespace) throws Exception {
+ final var topicName = TopicName.get(namespace +
"/test-policies-operations");
+ final var topic = topicName.toString();
+ admin.topics().createNonPartitionedTopic(topic);
+
+ final var compactionThreshold = new AtomicLong(0);
+ // Verify the exception thrown from one listener does not affect other
listeners
+ pulsar.getTopicPoliciesService().registerListenerAsync(topicName, __
-> {
+ throw new RuntimeException("injected failure");
+ }).get();
+ pulsar.getTopicPoliciesService().registerListenerAsync(topicName,
policies ->
+
Optional.ofNullable(policies).map(TopicPolicies::getCompactionThreshold).ifPresentOrElse(
+ compactionThreshold::set, () ->
compactionThreshold.set(-1))).get();
+
+ // Verify Created events are handled
+ admin.topicPolicies(false).setCompactionThreshold(topic, 100);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 100));
+ final var localStore = pulsar.getLocalMetadataStore();
+ final var configurationStore = pulsar.getConfigurationMetadataStore();
+
+ if (namespace.equals(metaNamespace)) {
+
assertTrue(localStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
false)).get());
+
assertFalse(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
true)).get());
+ }
+
+ admin.topicPolicies(true).setCompactionThreshold(topic, 200);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 200));
+ if (namespace.equals(metaNamespace)) {
+
assertTrue(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
true)).get());
+ }
+
+ // Verify Modified events are handled
+ admin.topicPolicies(false).setCompactionThreshold(topic, 300);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 300));
+
+ admin.topicPolicies(true).setCompactionThreshold(topic, 400);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 400));
+
+ final var readerNamespaces = ((LegacyAwareTopicPoliciesService)
pulsar.getTopicPoliciesService())
+ .systemTopicService.getReaderCaches().keySet();
+
assertFalse(readerNamespaces.contains(NamespaceName.get(metaNamespace)));
+
+ // Verify Deleted events are handled
+ admin.topicPolicies(false).deleteTopicPolicies(topic);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), -1));
+ if (namespace.equals(metaNamespace)) {
+
assertFalse(localStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
false)).get());
+
assertFalse(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName,
true)).get());
+ }
+ }
+
+ @Test
+ public void testUserCreatedEventsTopicAreIgnored() throws Exception {
+ final var topic = TopicName.get(metaNamespace + "/" +
System.currentTimeMillis()).toString();
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topicPolicies().setCompactionThreshold(topic, 1);
+ waitUntilAssert(() ->
assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 1));
+
+ final var eventsTopic = metaNamespace + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+ admin.topics().createNonPartitionedTopic(eventsTopic);
+ // Even if the __change_events topic is created, since it has detected
the namespace didn't have the events
+ // topic before, it will be ignored and the policies are still read
from metadata store.
+ waitUntilAssert(() ->
assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 1));
+ admin.topics().delete(eventsTopic);
+ }
+
+ private static void waitUntilAssert(ThrowingRunnable assertion) {
+
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(assertion);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index d02154c8178..2e8ca8cebec 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -109,7 +109,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
CompletableFuture<Void> f =
CompletableFuture.completedFuture(null).thenRunAsync(() -> {
for (int i = 0; i < 100; i++) {
TopicPolicyListener listener = new TopicPolicyListenerImpl();
-
systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+
systemTopicBasedTopicPoliciesService.registerListenerAsync(topicName, listener);
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size()
>= 1);
systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
@@ -118,7 +118,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
for (int i = 0; i < 100; i++) {
TopicPolicyListener listener = new TopicPolicyListenerImpl();
- systemTopicBasedTopicPoliciesService.registerListener(topicName,
listener);
+
systemTopicBasedTopicPoliciesService.registerListenerAsync(topicName, listener);
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size()
>= 1);
systemTopicBasedTopicPoliciesService.unregisterListener(topicName,
listener);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index 6b9735d59b2..7e9c697fb5d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -72,6 +72,13 @@ public class TopicPolicyTestUtils {
public static Optional<TopicPolicies>
getTopicPoliciesBypassCache(TopicPoliciesService topicPoliciesService,
TopicName topicName, boolean isGlobal)
throws Exception {
+ if (topicPoliciesService instanceof LegacyAwareTopicPoliciesService
legacyService) {
+ TopicPoliciesService resolved =
legacyService.resolveService(topicName.getNamespaceObject()).get();
+ return getTopicPoliciesBypassCache(resolved, topicName, isGlobal);
+ }
+ if (topicPoliciesService instanceof MetadataStoreTopicPoliciesService
metadataStoreService) {
+ return
metadataStoreService.getTopicPoliciesDirectFromStore(topicName, isGlobal).get();
+ }
@Cleanup final var reader = ((SystemTopicBasedTopicPoliciesService)
topicPoliciesService)
.getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject())