This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5cfd804fb5c [fix][broker]Non-global topic policies and global topic policies overwrite each other (#24286) 5cfd804fb5c is described below commit 5cfd804fb5c1fa9cb9da659751aeae2780f42d71 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Wed May 21 23:58:21 2025 +0800 [fix][broker]Non-global topic policies and global topic policies overwrite each other (#24286) (cherry picked from commit 66624713da79061dee455f0a1fd82b5fa8e9ff4b) --- .../SystemTopicBasedTopicPoliciesService.java | 60 ++++--- .../broker/service/TopicPoliciesService.java | 37 +++++ .../pulsar/broker/admin/TopicPoliciesTest.java | 185 +++++++++++++++++++++ .../NamespaceEventsSystemTopicServiceTest.java | 4 +- 4 files changed, 258 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 50eba440747..d433278215a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static java.util.Objects.requireNonNull; +import static org.apache.pulsar.broker.service.TopicPoliciesService.getEventKey; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; @@ -196,10 +197,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic writerCaches.synchronous().invalidate(topicName.getNamespaceObject()); result.completeExceptionally(cause); } else { - PulsarEvent event = getPulsarEvent(topicName, actionType, policies); - CompletableFuture<MessageId> writeFuture = ActionType.DELETE.equals(actionType) - ? writer.deleteAsync(getEventKey(event), event) - : writer.writeAsync(getEventKey(event), event); + CompletableFuture<MessageId> writeFuture = + sendTopicPolicyEventInternal(topicName, actionType, writer, policies); writeFuture.whenComplete((messageId, e) -> { if (e != null) { result.completeExceptionally(e); @@ -218,6 +217,25 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic }); } + private CompletableFuture<MessageId> sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType, + SystemTopicClient.Writer<PulsarEvent> writer, + TopicPolicies policies) { + PulsarEvent event = getPulsarEvent(topicName, actionType, policies); + if (!ActionType.DELETE.equals(actionType)) { + return writer.writeAsync(getEventKey(event, policies != null && policies.isGlobalPolicies()), event); + } + // When a topic is deleting, delete both non-global and global topic-level policies. + CompletableFuture<MessageId> deletePolicies = writer.deleteAsync(getEventKey(event, true), event) + .thenCompose(__ -> { + return writer.deleteAsync(getEventKey(event, false), event); + }); + deletePolicies.exceptionally(ex -> { + log.error("Failed to delete topic policy [{}] error.", topicName, ex); + return null; + }); + return deletePolicies; + } + private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) { PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder(); if (policies == null || !policies.isGlobalPolicies()) { @@ -241,7 +259,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private void notifyListener(Message<PulsarEvent> msg) { // delete policies if (msg.getValue() == null) { - TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()); + TopicName topicName = TopicName.get(TopicPoliciesService.unwrapEventKey(msg.getKey()) + .getPartitionedTopicName()); if (listeners.get(topicName) != null) { for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) { try { @@ -589,8 +608,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) { // delete policies if (msg.getValue() == null) { - TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()); - if (hasReplicateTo(msg)) { + boolean isGlobalPolicy = TopicPoliciesService.isGlobalPolicy(msg); + TopicName topicName = TopicName.get(TopicPoliciesService.unwrapEventKey(msg.getKey()) + .getPartitionedTopicName()); + if (isGlobalPolicy) { globalPoliciesCache.remove(topicName); } else { policiesCache.remove(topicName); @@ -630,14 +651,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()); - systemTopicClient.newWriterAsync().thenAccept(writer - -> writer.deleteAsync(getEventKey(topicName), - getPulsarEvent(topicName, ActionType.DELETE, null)) - .whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> { + systemTopicClient.newWriterAsync().thenAccept(writer -> { + sendTopicPolicyEventInternal(topicName, ActionType.DELETE, writer, event.getPolicies()) + .whenComplete((result, e) -> writer.closeAsync() + .whenComplete((res, ex) -> { if (ex != null) { log.error("close writer failed ", ex); } - }))); + })); + }); break; case NONE: break; @@ -730,20 +752,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic }); } - public static String getEventKey(PulsarEvent event) { - return TopicName.get(event.getTopicPoliciesEvent().getDomain(), - event.getTopicPoliciesEvent().getTenant(), - event.getTopicPoliciesEvent().getNamespace(), - event.getTopicPoliciesEvent().getTopic()).toString(); - } - - public static String getEventKey(TopicName topicName) { - return TopicName.get(topicName.getDomain().toString(), - topicName.getTenant(), - topicName.getNamespace(), - TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString(); - } - @VisibleForTesting long getPoliciesCacheSize() { return policiesCache.size(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 3b6d468a5ec..a14089f354e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -27,6 +27,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.Backoff; @@ -39,6 +41,8 @@ import org.apache.pulsar.common.util.FutureUtil; @InterfaceStability.Evolving public interface TopicPoliciesService extends AutoCloseable { + String GLOBAL_POLICIES_MSG_KEY_PREFIX = "__G__"; + TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled(); long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000; @@ -244,4 +248,37 @@ public interface TopicPoliciesService extends AutoCloseable { //No-op } } + + static String getEventKey(PulsarEvent event, boolean isGlobal) { + return wrapEventKey(TopicName.get(event.getTopicPoliciesEvent().getDomain(), + event.getTopicPoliciesEvent().getTenant(), + event.getTopicPoliciesEvent().getNamespace(), + event.getTopicPoliciesEvent().getTopic()).toString(), isGlobal); + } + + static String getEventKey(TopicName topicName, boolean isGlobal) { + return wrapEventKey(TopicName.get(topicName.getDomain().toString(), + topicName.getTenant(), + topicName.getNamespace(), + TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString(), isGlobal); + } + + static String wrapEventKey(String originalKey, boolean isGlobalPolicies) { + if (!isGlobalPolicies) { + return originalKey; + } + return GLOBAL_POLICIES_MSG_KEY_PREFIX + originalKey; + } + + static boolean isGlobalPolicy(Message<PulsarEvent> msg) { + return msg.getKey().startsWith(GLOBAL_POLICIES_MSG_KEY_PREFIX); + } + + static TopicName unwrapEventKey(String originalKey) { + String tpName = originalKey; + if (originalKey.startsWith(GLOBAL_POLICIES_MSG_KEY_PREFIX)) { + tpName = originalKey.substring(GLOBAL_POLICIES_MSG_KEY_PREFIX.length()); + } + return TopicName.get(tpName); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index ade80f9b2ab..a0e05a9e9c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -53,6 +54,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -3341,6 +3343,189 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } } + private void triggerAndWaitNewTopicCompaction(String topicName) throws Exception { + PersistentTopic tp = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + // Wait for the old task finish. + Awaitility.await().untilAsserted(() -> { + CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction"); + assertTrue(compactionTask == null || compactionTask.isDone()); + }); + // Trigger a new task. + tp.triggerCompaction(); + // Wait for the new task finish. + Awaitility.await().untilAsserted(() -> { + CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction"); + assertTrue(compactionTask == null || compactionTask.isDone()); + }); + } + + /*** + * It is not a thread safety method, something will go to a wrong pointer if there is a task is trying to load a + * topic policies. + */ + private void clearTopicPoliciesCache() { + TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService(); + if (topicPoliciesService instanceof TopicPoliciesService.TopicPoliciesServiceDisabled) { + return; + } + assertTrue(topicPoliciesService instanceof SystemTopicBasedTopicPoliciesService); + + Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = + WhiteboxImpl.getInternalState(topicPoliciesService, "policyCacheInitMap"); + for (CompletableFuture<Void> future : policyCacheInitMap.values()) { + future.join(); + } + Map<TopicName, TopicPolicies> policiesCache = + WhiteboxImpl.getInternalState(topicPoliciesService, "policiesCache"); + Map<TopicName, TopicPolicies> globalPoliciesCache = + WhiteboxImpl.getInternalState(topicPoliciesService, "globalPoliciesCache"); + + policyCacheInitMap.clear(); + policiesCache.clear(); + globalPoliciesCache.clear(); + } + + @DataProvider(name = "reloadPolicyTypes") + public Object[][] reloadPolicyTypes() { + return new Object[][]{ + {"Clean_Cache"}, + {"Recreate_Service"} + }; + } + + @Test(dataProvider = "reloadPolicyTypes") + public void testTopicPoliciesAfterCompaction(String reloadPolicyType) throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp"); + final String tpNameChangeEvents = "persistent://" + myNamespace + "/" + NAMESPACE_EVENTS_LOCAL_NAME; + final String subscriptionName = "s1"; + final int rateMsgLocal = 2000; + final int rateMsgGlobal = 1000; + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subscriptionName, MessageId.earliest); + + // Set global policy and local policy. + // Trigger __change_events compaction. + // Reload polices into memory. + // Verify: policies was affected. + DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1, false, 1); + DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1, false, 1); + admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal); + admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal); + triggerAndWaitNewTopicCompaction(tpNameChangeEvents); + Optional<TopicPolicies> topicPoliciesOptional1 = null; + Optional<TopicPolicies> topicPoliciesOptionalGlobal1 = null; + if ("Clean_Cache".equals(reloadPolicyType)) { + clearTopicPoliciesCache(); + topicPoliciesOptional1 = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName), + false).join(); + topicPoliciesOptionalGlobal1 = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName), + true).join(); + } else { + SystemTopicBasedTopicPoliciesService newService = new SystemTopicBasedTopicPoliciesService(pulsar); + topicPoliciesOptional1 = newService.getTopicPoliciesAsync(TopicName.get(tpName), false).join(); + topicPoliciesOptionalGlobal1 = newService.getTopicPoliciesAsync(TopicName.get(tpName), true).join(); + newService.close(); + } + assertTrue(topicPoliciesOptional1.isPresent()); + assertEquals(topicPoliciesOptional1.get().getDispatchRate().getDispatchThrottlingRateInMsg(), rateMsgLocal); + assertEquals(topicPoliciesOptionalGlobal1.get().getDispatchRate().getDispatchThrottlingRateInMsg(), + rateMsgGlobal); + + // Remove local policy. + // Trigger __change_events compaction. + // Reload polices into memory. + // Verify: policies was affected. + admin.topicPolicies(false).removeDispatchRate(tpName); + triggerAndWaitNewTopicCompaction(tpNameChangeEvents); + Optional<TopicPolicies> topicPoliciesOptional2 = null; + Optional<TopicPolicies> topicPoliciesOptionalGlobal2 = null; + if ("Clean_Cache".equals(reloadPolicyType)) { + clearTopicPoliciesCache(); + topicPoliciesOptional2 = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName), + false).join(); + topicPoliciesOptionalGlobal2 = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName), + true).join(); + } else { + SystemTopicBasedTopicPoliciesService newService = new SystemTopicBasedTopicPoliciesService(pulsar); + topicPoliciesOptional2 = newService.getTopicPoliciesAsync(TopicName.get(tpName), false).join(); + topicPoliciesOptionalGlobal2 = newService.getTopicPoliciesAsync(TopicName.get(tpName), true).join(); + newService.close(); + } + assertTrue(topicPoliciesOptional2.isEmpty() || topicPoliciesOptional2.get().getDispatchRate() == null); + assertTrue(topicPoliciesOptionalGlobal2.isPresent()); + assertEquals(topicPoliciesOptionalGlobal2.get().getDispatchRate().getDispatchThrottlingRateInMsg(), + rateMsgGlobal); + + // Delete topic. + // Trigger __change_events compaction. + // Reload polices into memory. + // Verify: policies was deleted. + admin.topics().delete(tpName, false); + Awaitility.await().untilAsserted(() -> { + // Reload polices into memory. + // Verify: policies was affected. + Optional<TopicPolicies> topicPoliciesOptional3 = null; + Optional<TopicPolicies> topicPoliciesOptionalGlobal3 = null; + if ("Clean_Cache".equals(reloadPolicyType)) { + clearTopicPoliciesCache(); + topicPoliciesOptional3 = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName), + false).join(); + topicPoliciesOptionalGlobal3 = pulsar.getTopicPoliciesService() + .getTopicPoliciesAsync(TopicName.get(tpName), true).join(); + } else { + SystemTopicBasedTopicPoliciesService newService = new SystemTopicBasedTopicPoliciesService(pulsar); + topicPoliciesOptional3 = newService.getTopicPoliciesAsync(TopicName.get(tpName), false).join(); + topicPoliciesOptionalGlobal3 = newService.getTopicPoliciesAsync(TopicName.get(tpName), true) + .join(); + newService.close(); + } + assertTrue(topicPoliciesOptional3.isEmpty() + || topicPoliciesOptional3.get().getDispatchRate() == null); + assertTrue(topicPoliciesOptionalGlobal3.isEmpty() + || topicPoliciesOptionalGlobal3.get().getDispatchRate() == null); + }); + } + + @Test + public void testDeleteGlobalPolicy() throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp"); + final String tpNameChangeEvents = "persistent://" + myNamespace + "/" + NAMESPACE_EVENTS_LOCAL_NAME; + final String subscriptionName = "s1"; + final int rateMsgGlobal = 1000; + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subscriptionName, MessageId.earliest); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(tpName).get().get(); + + // Set global policy. + // Verify: policies was affected. + DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1, false, 1); + admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal); + Awaitility.await().untilAsserted(() -> { + assertEquals(persistentTopic.getHierarchyTopicPolicies().getDispatchRate().get(), dispatchRateGlobal); + }); + + // Delete global policy. + // Verify: policies were deleted. + triggerAndWaitNewTopicCompaction(tpNameChangeEvents); + admin.topicPolicies(true).removeDispatchRate(tpName); + + Awaitility.await().untilAsserted(() -> { + Optional<TopicPolicies> topicPoliciesOptional = pulsar.getTopicPoliciesService() + .getTopicPoliciesAsync(TopicName.get(tpName), false).join(); + Optional<TopicPolicies> topicPoliciesOptionalGlobal = pulsar.getTopicPoliciesService() + .getTopicPoliciesAsync(TopicName.get(tpName), true).join(); + assertTrue(topicPoliciesOptional.isEmpty() + || topicPoliciesOptional.get().getDispatchRate() == null); + assertTrue(topicPoliciesOptionalGlobal.isEmpty() + || topicPoliciesOptionalGlobal.get().getDispatchRate() == null); + }); + + // cleanup. + admin.topics().delete(tpName, false); + } + @Test public void testGlobalTopicPolicies() throws Exception { final String topic = testTopic + UUID.randomUUID(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index e66140efb32..f7847352dfe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.systopic; -import static org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey; +import static org.apache.pulsar.broker.service.TopicPoliciesService.getEventKey; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -127,7 +127,7 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa .policies(policies) .build()) .build(); - systemTopicClientForNamespace1.newWriter().write(getEventKey(event), event); + systemTopicClientForNamespace1.newWriter().write(getEventKey(event, false), event); SystemTopicClient.Reader reader = systemTopicClientForNamespace1.newReader(); Message<PulsarEvent> received = reader.readNext(); log.info("Receive pulsar event from system topic : {}", received.getValue());