This is an automated email from the ASF dual-hosted git repository.
rgao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 66624713da7 [fix][broker]Non-global topic policies and global topic
policies overwrite each other (#24286)
66624713da7 is described below
commit 66624713da79061dee455f0a1fd82b5fa8e9ff4b
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 21 23:58:21 2025 +0800
[fix][broker]Non-global topic policies and global topic policies overwrite
each other (#24286)
---
.../SystemTopicBasedTopicPoliciesService.java | 61 ++++---
.../broker/service/TopicPoliciesService.java | 37 ++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 186 +++++++++++++++++++++
.../NamespaceEventsSystemTopicServiceTest.java | 4 +-
4 files changed, 261 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 61960311e03..4745decf58d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static java.util.Objects.requireNonNull;
+import static
org.apache.pulsar.broker.service.TopicPoliciesService.getEventKey;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
@@ -61,6 +62,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.jspecify.annotations.NonNull;
+import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,7 +177,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName,
ActionType actionType,
- TopicPolicies
policies) {
+ @Nullable
TopicPolicies policies) {
return pulsarService.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(namespacePolicies -> {
@@ -196,10 +198,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
result.completeExceptionally(cause);
} else {
- PulsarEvent event =
getPulsarEvent(topicName, actionType, policies);
- CompletableFuture<MessageId> writeFuture =
ActionType.DELETE.equals(actionType)
- ?
writer.deleteAsync(getEventKey(event), event)
- :
writer.writeAsync(getEventKey(event), event);
+ CompletableFuture<MessageId> writeFuture =
+
sendTopicPolicyEventInternal(topicName, actionType, writer, policies);
writeFuture.whenComplete((messageId, e) ->
{
if (e != null) {
result.completeExceptionally(e);
@@ -218,6 +218,25 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
+ private CompletableFuture<MessageId>
sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType,
+ SystemTopicClient.Writer<PulsarEvent>
writer,
+ @Nullable TopicPolicies policies) {
+ PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
+ if (!ActionType.DELETE.equals(actionType)) {
+ return writer.writeAsync(getEventKey(event, policies != null &&
policies.isGlobalPolicies()), event);
+ }
+ // When a topic is deleting, delete both non-global and global
topic-level policies.
+ CompletableFuture<MessageId> deletePolicies =
writer.deleteAsync(getEventKey(event, true), event)
+ .thenCompose(__ -> {
+ return writer.deleteAsync(getEventKey(event, false), event);
+ });
+ deletePolicies.exceptionally(ex -> {
+ log.error("Failed to delete topic policy [{}] error.", topicName,
ex);
+ return null;
+ });
+ return deletePolicies;
+ }
+
private PulsarEvent getPulsarEvent(TopicName topicName, ActionType
actionType, TopicPolicies policies) {
PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder();
if (policies == null || !policies.isGlobalPolicies()) {
@@ -241,7 +260,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void notifyListener(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
- TopicName topicName =
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
+ TopicName topicName =
TopicName.get(TopicPoliciesService.unwrapEventKey(msg.getKey())
+ .getPartitionedTopicName());
if (listeners.get(topicName) != null) {
for (TopicPolicyListener listener : listeners.get(topicName)) {
try {
@@ -552,8 +572,10 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
- TopicName topicName =
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
- if (hasReplicateTo(msg)) {
+ boolean isGlobalPolicy = TopicPoliciesService.isGlobalPolicy(msg);
+ TopicName topicName =
TopicName.get(TopicPoliciesService.unwrapEventKey(msg.getKey())
+ .getPartitionedTopicName());
+ if (isGlobalPolicy) {
globalPoliciesCache.remove(topicName);
} else {
policiesCache.remove(topicName);
@@ -593,14 +615,15 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
SystemTopicClient<PulsarEvent> systemTopicClient =
getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
- systemTopicClient.newWriterAsync().thenAccept(writer
- -> writer.deleteAsync(getEventKey(topicName),
- getPulsarEvent(topicName,
ActionType.DELETE, null))
- .whenComplete((result, e) ->
writer.closeAsync().whenComplete((res, ex) -> {
+ systemTopicClient.newWriterAsync().thenAccept(writer -> {
+ sendTopicPolicyEventInternal(topicName,
ActionType.DELETE, writer, event.getPolicies())
+ .whenComplete((result, e) -> writer.closeAsync()
+ .whenComplete((res, ex) -> {
if (ex != null) {
log.error("close writer failed ", ex);
}
- })));
+ }));
+ });
break;
case NONE:
break;
@@ -642,19 +665,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
}
- public static String getEventKey(PulsarEvent event) {
- return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
- event.getTopicPoliciesEvent().getTenant(),
- event.getTopicPoliciesEvent().getNamespace(),
- event.getTopicPoliciesEvent().getTopic()).toString();
- }
- public static String getEventKey(TopicName topicName) {
- return TopicName.get(topicName.getDomain().toString(),
- topicName.getTenant(),
- topicName.getNamespace(),
-
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
- }
@VisibleForTesting
long getPoliciesCacheSize() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 4b7ed3765bb..ec5da7995bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.broker.service;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
@@ -34,6 +36,8 @@ import org.apache.pulsar.common.util.FutureUtil;
@InterfaceAudience.LimitedPrivate
public interface TopicPoliciesService extends AutoCloseable {
+ String GLOBAL_POLICIES_MSG_KEY_PREFIX = "__G__";
+
TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
/**
@@ -123,4 +127,37 @@ public interface TopicPoliciesService extends
AutoCloseable {
//No-op
}
}
+
+ static String getEventKey(PulsarEvent event, boolean isGlobal) {
+ return
wrapEventKey(TopicName.get(event.getTopicPoliciesEvent().getDomain(),
+ event.getTopicPoliciesEvent().getTenant(),
+ event.getTopicPoliciesEvent().getNamespace(),
+ event.getTopicPoliciesEvent().getTopic()).toString(), isGlobal);
+ }
+
+ static String getEventKey(TopicName topicName, boolean isGlobal) {
+ return wrapEventKey(TopicName.get(topicName.getDomain().toString(),
+ topicName.getTenant(),
+ topicName.getNamespace(),
+
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString(),
isGlobal);
+ }
+
+ static String wrapEventKey(String originalKey, boolean isGlobalPolicies) {
+ if (!isGlobalPolicies) {
+ return originalKey;
+ }
+ return GLOBAL_POLICIES_MSG_KEY_PREFIX + originalKey;
+ }
+
+ static boolean isGlobalPolicy(Message<PulsarEvent> msg) {
+ return msg.getKey().startsWith(GLOBAL_POLICIES_MSG_KEY_PREFIX);
+ }
+
+ static TopicName unwrapEventKey(String originalKey) {
+ String tpName = originalKey;
+ if (originalKey.startsWith(GLOBAL_POLICIES_MSG_KEY_PREFIX)) {
+ tpName =
originalKey.substring(GLOBAL_POLICIES_MSG_KEY_PREFIX.length());
+ }
+ return TopicName.get(tpName);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index c7a064e831d..bf2c3e5b2aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
import static
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY;
import static
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.LOCAL_ONLY;
+import static
org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -55,6 +56,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyTestUtils;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -99,6 +101,7 @@ import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.glassfish.jersey.client.JerseyClient;
import org.glassfish.jersey.client.JerseyClientBuilder;
+import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -3339,6 +3342,189 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
}
+ private void triggerAndWaitNewTopicCompaction(String topicName) throws
Exception {
+ PersistentTopic tp =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ // Wait for the old task finish.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<Long> compactionTask =
WhiteboxImpl.getInternalState(tp, "currentCompaction");
+ assertTrue(compactionTask == null || compactionTask.isDone());
+ });
+ // Trigger a new task.
+ tp.triggerCompaction();
+ // Wait for the new task finish.
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<Long> compactionTask =
WhiteboxImpl.getInternalState(tp, "currentCompaction");
+ assertTrue(compactionTask == null || compactionTask.isDone());
+ });
+ }
+
+ /***
+ * It is not a thread safety method, something will go to a wrong pointer
if there is a task is trying to load a
+ * topic policies.
+ */
+ private void clearTopicPoliciesCache() {
+ TopicPoliciesService topicPoliciesService =
pulsar.getTopicPoliciesService();
+ if (topicPoliciesService instanceof
TopicPoliciesService.TopicPoliciesServiceDisabled) {
+ return;
+ }
+ assertTrue(topicPoliciesService instanceof
SystemTopicBasedTopicPoliciesService);
+
+ Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap =
+ WhiteboxImpl.getInternalState(topicPoliciesService,
"policyCacheInitMap");
+ for (CompletableFuture<Void> future : policyCacheInitMap.values()) {
+ future.join();
+ }
+ Map<TopicName, TopicPolicies> policiesCache =
+ WhiteboxImpl.getInternalState(topicPoliciesService,
"policiesCache");
+ Map<TopicName, TopicPolicies> globalPoliciesCache =
+ WhiteboxImpl.getInternalState(topicPoliciesService,
"globalPoliciesCache");
+
+ policyCacheInitMap.clear();
+ policiesCache.clear();
+ globalPoliciesCache.clear();
+ }
+
+ @DataProvider(name = "reloadPolicyTypes")
+ public Object[][] reloadPolicyTypes() {
+ return new Object[][]{
+ {"Clean_Cache"},
+ {"Recreate_Service"}
+ };
+ }
+
+ @Test(dataProvider = "reloadPolicyTypes")
+ public void testTopicPoliciesAfterCompaction(String reloadPolicyType)
throws Exception {
+ final String tpName = BrokerTestUtil.newUniqueName("persistent://" +
myNamespace + "/tp");
+ final String tpNameChangeEvents = "persistent://" + myNamespace + "/"
+ NAMESPACE_EVENTS_LOCAL_NAME;
+ final String subscriptionName = "s1";
+ final int rateMsgLocal = 2000;
+ final int rateMsgGlobal = 1000;
+ admin.topics().createNonPartitionedTopic(tpName);
+ admin.topics().createSubscription(tpName, subscriptionName,
MessageId.earliest);
+
+ // Set global policy and local policy.
+ // Trigger __change_events compaction.
+ // Reload polices into memory.
+ // Verify: policies was affected.
+ DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1,
false, 1);
+ DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal,
1, false, 1);
+ admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
+ admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
+ triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+ Optional<TopicPolicies> topicPoliciesOptional1 = null;
+ Optional<TopicPolicies> topicPoliciesOptionalGlobal1 = null;
+ if ("Clean_Cache".equals(reloadPolicyType)) {
+ clearTopicPoliciesCache();
+ topicPoliciesOptional1 =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+ LOCAL_ONLY).join();
+ topicPoliciesOptionalGlobal1 =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+ GLOBAL_ONLY).join();
+ } else {
+ SystemTopicBasedTopicPoliciesService newService = new
SystemTopicBasedTopicPoliciesService(pulsar);
+ topicPoliciesOptional1 =
newService.getTopicPoliciesAsync(TopicName.get(tpName), LOCAL_ONLY).join();
+ topicPoliciesOptionalGlobal1 =
newService.getTopicPoliciesAsync(TopicName.get(tpName), GLOBAL_ONLY).join();
+ newService.close();
+ }
+ assertTrue(topicPoliciesOptional1.isPresent());
+
assertEquals(topicPoliciesOptional1.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
rateMsgLocal);
+
assertEquals(topicPoliciesOptionalGlobal1.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
+ rateMsgGlobal);
+
+ // Remove local policy.
+ // Trigger __change_events compaction.
+ // Reload polices into memory.
+ // Verify: policies was affected.
+ admin.topicPolicies(false).removeDispatchRate(tpName);
+ triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+ Optional<TopicPolicies> topicPoliciesOptional2 = null;
+ Optional<TopicPolicies> topicPoliciesOptionalGlobal2 = null;
+ if ("Clean_Cache".equals(reloadPolicyType)) {
+ clearTopicPoliciesCache();
+ topicPoliciesOptional2 =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+ LOCAL_ONLY).join();
+ topicPoliciesOptionalGlobal2 =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+ GLOBAL_ONLY).join();
+ } else {
+ SystemTopicBasedTopicPoliciesService newService = new
SystemTopicBasedTopicPoliciesService(pulsar);
+ topicPoliciesOptional2 =
newService.getTopicPoliciesAsync(TopicName.get(tpName), LOCAL_ONLY).join();
+ topicPoliciesOptionalGlobal2 =
newService.getTopicPoliciesAsync(TopicName.get(tpName), GLOBAL_ONLY).join();
+ newService.close();
+ }
+ assertTrue(topicPoliciesOptional2.isEmpty() ||
topicPoliciesOptional2.get().getDispatchRate() == null);
+ assertTrue(topicPoliciesOptionalGlobal2.isPresent());
+
assertEquals(topicPoliciesOptionalGlobal2.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
+ rateMsgGlobal);
+
+ // Delete topic.
+ // Trigger __change_events compaction.
+ // Reload polices into memory.
+ // Verify: policies was deleted.
+ admin.topics().delete(tpName, false);
+ Awaitility.await().untilAsserted(() -> {
+ // Reload polices into memory.
+ // Verify: policies was affected.
+ Optional<TopicPolicies> topicPoliciesOptional3 = null;
+ Optional<TopicPolicies> topicPoliciesOptionalGlobal3 = null;
+ if ("Clean_Cache".equals(reloadPolicyType)) {
+ clearTopicPoliciesCache();
+ topicPoliciesOptional3 =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+ LOCAL_ONLY).join();
+ topicPoliciesOptionalGlobal3 = pulsar.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(tpName),
GLOBAL_ONLY).join();
+ } else {
+ SystemTopicBasedTopicPoliciesService newService = new
SystemTopicBasedTopicPoliciesService(pulsar);
+ topicPoliciesOptional3 =
newService.getTopicPoliciesAsync(TopicName.get(tpName), LOCAL_ONLY).join();
+ topicPoliciesOptionalGlobal3 =
newService.getTopicPoliciesAsync(TopicName.get(tpName), GLOBAL_ONLY)
+ .join();
+ newService.close();
+ }
+ assertTrue(topicPoliciesOptional3.isEmpty()
+ || topicPoliciesOptional3.get().getDispatchRate() == null);
+ assertTrue(topicPoliciesOptionalGlobal3.isEmpty()
+ || topicPoliciesOptionalGlobal3.get().getDispatchRate() ==
null);
+ });
+ }
+
+ @Test
+ public void testDeleteGlobalPolicy() throws Exception {
+ final String tpName = BrokerTestUtil.newUniqueName("persistent://" +
myNamespace + "/tp");
+ final String tpNameChangeEvents = "persistent://" + myNamespace + "/"
+ NAMESPACE_EVENTS_LOCAL_NAME;
+ final String subscriptionName = "s1";
+ final int rateMsgGlobal = 1000;
+ admin.topics().createNonPartitionedTopic(tpName);
+ admin.topics().createSubscription(tpName, subscriptionName,
MessageId.earliest);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(tpName).get().get();
+
+ // Set global policy.
+ // Verify: policies was affected.
+ DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal,
1, false, 1);
+ admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(persistentTopic.getHierarchyTopicPolicies().getDispatchRate().get(),
dispatchRateGlobal);
+ });
+
+ // Delete global policy.
+ // Verify: policies were deleted.
+ triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+ admin.topicPolicies(true).removeDispatchRate(tpName);
+
+ Awaitility.await().untilAsserted(() -> {
+ Optional<TopicPolicies> topicPoliciesOptional =
pulsar.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(tpName),
LOCAL_ONLY).join();
+ Optional<TopicPolicies> topicPoliciesOptionalGlobal =
pulsar.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(tpName),
GLOBAL_ONLY).join();
+ assertTrue(topicPoliciesOptional.isEmpty()
+ || topicPoliciesOptional.get().getDispatchRate() == null);
+ assertTrue(topicPoliciesOptionalGlobal.isEmpty()
+ || topicPoliciesOptionalGlobal.get().getDispatchRate() ==
null);
+ });
+
+ // cleanup.
+ admin.topics().delete(tpName, false);
+ }
+
@Test
public void testGlobalTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index aaa719515c9..885309f8960 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.systopic;
-import static
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey;
+import static
org.apache.pulsar.broker.service.TopicPoliciesService.getEventKey;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -127,7 +127,7 @@ public class NamespaceEventsSystemTopicServiceTest extends
MockedPulsarServiceBa
.policies(policies)
.build())
.build();
- systemTopicClientForNamespace1.newWriter().write(getEventKey(event),
event);
+ systemTopicClientForNamespace1.newWriter().write(getEventKey(event,
false), event);
SystemTopicClient.Reader reader =
systemTopicClientForNamespace1.newReader();
@Cleanup("release")
Message<PulsarEvent> received = reader.readNext();