This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5cfd804fb5c [fix][broker]Non-global topic policies and global topic 
policies overwrite each other (#24286)
5cfd804fb5c is described below

commit 5cfd804fb5c1fa9cb9da659751aeae2780f42d71
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Wed May 21 23:58:21 2025 +0800

    [fix][broker]Non-global topic policies and global topic policies overwrite 
each other (#24286)
    
    (cherry picked from commit 66624713da79061dee455f0a1fd82b5fa8e9ff4b)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  60 ++++---
 .../broker/service/TopicPoliciesService.java       |  37 +++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 185 +++++++++++++++++++++
 .../NamespaceEventsSystemTopicServiceTest.java     |   4 +-
 4 files changed, 258 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 50eba440747..d433278215a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.pulsar.broker.service.TopicPoliciesService.getEventKey;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
@@ -196,10 +197,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                     
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
                                     result.completeExceptionally(cause);
                                 } else {
-                                    PulsarEvent event = 
getPulsarEvent(topicName, actionType, policies);
-                                    CompletableFuture<MessageId> writeFuture = 
ActionType.DELETE.equals(actionType)
-                                                    ? 
writer.deleteAsync(getEventKey(event), event)
-                                                    : 
writer.writeAsync(getEventKey(event), event);
+                                    CompletableFuture<MessageId> writeFuture =
+                                            
sendTopicPolicyEventInternal(topicName, actionType, writer, policies);
                                     writeFuture.whenComplete((messageId, e) -> 
{
                                         if (e != null) {
                                             result.completeExceptionally(e);
@@ -218,6 +217,25 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 });
     }
 
+    private CompletableFuture<MessageId> 
sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType,
+                                      SystemTopicClient.Writer<PulsarEvent> 
writer,
+                                      TopicPolicies policies) {
+        PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
+        if (!ActionType.DELETE.equals(actionType)) {
+            return writer.writeAsync(getEventKey(event, policies != null && 
policies.isGlobalPolicies()), event);
+        }
+        // When a topic is deleting, delete both non-global and global 
topic-level policies.
+        CompletableFuture<MessageId> deletePolicies = 
writer.deleteAsync(getEventKey(event, true), event)
+            .thenCompose(__ -> {
+                return writer.deleteAsync(getEventKey(event, false), event);
+            });
+        deletePolicies.exceptionally(ex -> {
+            log.error("Failed to delete topic policy [{}] error.", topicName, 
ex);
+            return null;
+        });
+        return deletePolicies;
+    }
+
     private PulsarEvent getPulsarEvent(TopicName topicName, ActionType 
actionType, TopicPolicies policies) {
         PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder();
         if (policies == null || !policies.isGlobalPolicies()) {
@@ -241,7 +259,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void notifyListener(Message<PulsarEvent> msg) {
         // delete policies
         if (msg.getValue() == null) {
-            TopicName topicName =  
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
+            TopicName topicName = 
TopicName.get(TopicPoliciesService.unwrapEventKey(msg.getKey())
+                    .getPartitionedTopicName());
             if (listeners.get(topicName) != null) {
                 for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
                     try {
@@ -589,8 +608,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
         // delete policies
         if (msg.getValue() == null) {
-            TopicName topicName = 
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
-            if (hasReplicateTo(msg)) {
+            boolean isGlobalPolicy = TopicPoliciesService.isGlobalPolicy(msg);
+            TopicName topicName = 
TopicName.get(TopicPoliciesService.unwrapEventKey(msg.getKey())
+                    .getPartitionedTopicName());
+            if (isGlobalPolicy) {
                 globalPoliciesCache.remove(topicName);
             } else {
                 policiesCache.remove(topicName);
@@ -630,14 +651,15 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     }
                     SystemTopicClient<PulsarEvent> systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
                             
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
-                    systemTopicClient.newWriterAsync().thenAccept(writer
-                            -> writer.deleteAsync(getEventKey(topicName),
-                                    getPulsarEvent(topicName, 
ActionType.DELETE, null))
-                            .whenComplete((result, e) -> 
writer.closeAsync().whenComplete((res, ex) -> {
+                    systemTopicClient.newWriterAsync().thenAccept(writer -> {
+                        sendTopicPolicyEventInternal(topicName, 
ActionType.DELETE, writer, event.getPolicies())
+                            .whenComplete((result, e) -> writer.closeAsync()
+                            .whenComplete((res, ex) -> {
                                 if (ex != null) {
                                     log.error("close writer failed ", ex);
                                 }
-                            })));
+                            }));
+                    });
                     break;
                 case NONE:
                     break;
@@ -730,20 +752,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-    public static String getEventKey(PulsarEvent event) {
-        return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
-                event.getTopicPoliciesEvent().getTenant(),
-                event.getTopicPoliciesEvent().getNamespace(),
-                event.getTopicPoliciesEvent().getTopic()).toString();
-    }
-
-    public static String getEventKey(TopicName topicName) {
-        return TopicName.get(topicName.getDomain().toString(),
-                topicName.getTenant(),
-                topicName.getNamespace(),
-                
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
-    }
-
     @VisibleForTesting
     long getPoliciesCacheSize() {
         return policiesCache.size();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 3b6d468a5ec..a14089f354e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -27,6 +27,8 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach
 import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Backoff;
@@ -39,6 +41,8 @@ import org.apache.pulsar.common.util.FutureUtil;
 @InterfaceStability.Evolving
 public interface TopicPoliciesService extends AutoCloseable {
 
+    String GLOBAL_POLICIES_MSG_KEY_PREFIX = "__G__";
+
     TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
     long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
 
@@ -244,4 +248,37 @@ public interface TopicPoliciesService extends 
AutoCloseable {
             //No-op
         }
     }
+
+    static String getEventKey(PulsarEvent event, boolean isGlobal) {
+        return 
wrapEventKey(TopicName.get(event.getTopicPoliciesEvent().getDomain(),
+            event.getTopicPoliciesEvent().getTenant(),
+            event.getTopicPoliciesEvent().getNamespace(),
+            event.getTopicPoliciesEvent().getTopic()).toString(), isGlobal);
+    }
+
+    static String getEventKey(TopicName topicName, boolean isGlobal) {
+        return wrapEventKey(TopicName.get(topicName.getDomain().toString(),
+            topicName.getTenant(),
+            topicName.getNamespace(),
+            
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString(), 
isGlobal);
+    }
+
+    static String wrapEventKey(String originalKey, boolean isGlobalPolicies) {
+        if (!isGlobalPolicies) {
+            return originalKey;
+        }
+        return GLOBAL_POLICIES_MSG_KEY_PREFIX + originalKey;
+    }
+
+    static boolean isGlobalPolicy(Message<PulsarEvent> msg) {
+        return msg.getKey().startsWith(GLOBAL_POLICIES_MSG_KEY_PREFIX);
+    }
+
+    static TopicName unwrapEventKey(String originalKey) {
+        String tpName = originalKey;
+        if (originalKey.startsWith(GLOBAL_POLICIES_MSG_KEY_PREFIX)) {
+            tpName = 
originalKey.substring(GLOBAL_POLICIES_MSG_KEY_PREFIX.length());
+        }
+        return TopicName.get(tpName);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index ade80f9b2ab..a0e05a9e9c9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
@@ -53,6 +54,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -3341,6 +3343,189 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    private void triggerAndWaitNewTopicCompaction(String topicName) throws 
Exception {
+        PersistentTopic tp =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        // Wait for the old task finish.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Long> compactionTask = 
WhiteboxImpl.getInternalState(tp, "currentCompaction");
+            assertTrue(compactionTask == null || compactionTask.isDone());
+        });
+        // Trigger a new task.
+        tp.triggerCompaction();
+        // Wait for the new task finish.
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Long> compactionTask = 
WhiteboxImpl.getInternalState(tp, "currentCompaction");
+            assertTrue(compactionTask == null || compactionTask.isDone());
+        });
+    }
+
+    /***
+     * It is not a thread safety method, something will go to a wrong pointer 
if there is a task is trying to load a
+     * topic policies.
+     */
+    private void clearTopicPoliciesCache() {
+        TopicPoliciesService topicPoliciesService = 
pulsar.getTopicPoliciesService();
+        if (topicPoliciesService instanceof 
TopicPoliciesService.TopicPoliciesServiceDisabled) {
+            return;
+        }
+        assertTrue(topicPoliciesService instanceof 
SystemTopicBasedTopicPoliciesService);
+
+        Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap =
+                WhiteboxImpl.getInternalState(topicPoliciesService, 
"policyCacheInitMap");
+        for (CompletableFuture<Void> future : policyCacheInitMap.values()) {
+            future.join();
+        }
+        Map<TopicName, TopicPolicies> policiesCache =
+                WhiteboxImpl.getInternalState(topicPoliciesService, 
"policiesCache");
+        Map<TopicName, TopicPolicies> globalPoliciesCache =
+                WhiteboxImpl.getInternalState(topicPoliciesService, 
"globalPoliciesCache");
+
+        policyCacheInitMap.clear();
+        policiesCache.clear();
+        globalPoliciesCache.clear();
+    }
+
+    @DataProvider(name = "reloadPolicyTypes")
+    public Object[][] reloadPolicyTypes() {
+        return new Object[][]{
+            {"Clean_Cache"},
+            {"Recreate_Service"}
+        };
+    }
+
+    @Test(dataProvider = "reloadPolicyTypes")
+    public void testTopicPoliciesAfterCompaction(String reloadPolicyType) 
throws Exception {
+        final String tpName = BrokerTestUtil.newUniqueName("persistent://" + 
myNamespace + "/tp");
+        final String tpNameChangeEvents = "persistent://" + myNamespace + "/" 
+ NAMESPACE_EVENTS_LOCAL_NAME;
+        final String subscriptionName = "s1";
+        final int rateMsgLocal = 2000;
+        final int rateMsgGlobal = 1000;
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.topics().createSubscription(tpName, subscriptionName, 
MessageId.earliest);
+
+        // Set global policy and local policy.
+        // Trigger __change_events compaction.
+        // Reload polices into memory.
+        // Verify: policies was affected.
+        DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1, 
false, 1);
+        DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 
1, false, 1);
+        admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
+        admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
+        triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+        Optional<TopicPolicies> topicPoliciesOptional1 = null;
+        Optional<TopicPolicies> topicPoliciesOptionalGlobal1 = null;
+        if ("Clean_Cache".equals(reloadPolicyType)) {
+            clearTopicPoliciesCache();
+            topicPoliciesOptional1 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                            false).join();
+            topicPoliciesOptionalGlobal1 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                    true).join();
+        } else {
+            SystemTopicBasedTopicPoliciesService newService = new 
SystemTopicBasedTopicPoliciesService(pulsar);
+            topicPoliciesOptional1 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), false).join();
+            topicPoliciesOptionalGlobal1 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), true).join();
+            newService.close();
+        }
+        assertTrue(topicPoliciesOptional1.isPresent());
+        
assertEquals(topicPoliciesOptional1.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
 rateMsgLocal);
+        
assertEquals(topicPoliciesOptionalGlobal1.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
+                rateMsgGlobal);
+
+        // Remove local policy.
+        // Trigger __change_events compaction.
+        // Reload polices into memory.
+        // Verify: policies was affected.
+        admin.topicPolicies(false).removeDispatchRate(tpName);
+        triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+        Optional<TopicPolicies> topicPoliciesOptional2 = null;
+        Optional<TopicPolicies> topicPoliciesOptionalGlobal2 = null;
+        if ("Clean_Cache".equals(reloadPolicyType)) {
+            clearTopicPoliciesCache();
+            topicPoliciesOptional2 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                    false).join();
+            topicPoliciesOptionalGlobal2 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                    true).join();
+        } else {
+            SystemTopicBasedTopicPoliciesService newService = new 
SystemTopicBasedTopicPoliciesService(pulsar);
+            topicPoliciesOptional2 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), false).join();
+            topicPoliciesOptionalGlobal2 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), true).join();
+            newService.close();
+        }
+        assertTrue(topicPoliciesOptional2.isEmpty() || 
topicPoliciesOptional2.get().getDispatchRate() == null);
+        assertTrue(topicPoliciesOptionalGlobal2.isPresent());
+        
assertEquals(topicPoliciesOptionalGlobal2.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
+                rateMsgGlobal);
+
+        // Delete topic.
+        // Trigger __change_events compaction.
+        // Reload polices into memory.
+        // Verify: policies was deleted.
+        admin.topics().delete(tpName, false);
+        Awaitility.await().untilAsserted(() -> {
+            // Reload polices into memory.
+            // Verify: policies was affected.
+            Optional<TopicPolicies> topicPoliciesOptional3 = null;
+            Optional<TopicPolicies> topicPoliciesOptionalGlobal3 = null;
+            if ("Clean_Cache".equals(reloadPolicyType)) {
+                clearTopicPoliciesCache();
+                topicPoliciesOptional3 = 
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName),
+                        false).join();
+                topicPoliciesOptionalGlobal3 = pulsar.getTopicPoliciesService()
+                        .getTopicPoliciesAsync(TopicName.get(tpName), 
true).join();
+            } else {
+                SystemTopicBasedTopicPoliciesService newService = new 
SystemTopicBasedTopicPoliciesService(pulsar);
+                topicPoliciesOptional3 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), false).join();
+                topicPoliciesOptionalGlobal3 = 
newService.getTopicPoliciesAsync(TopicName.get(tpName), true)
+                        .join();
+                newService.close();
+            }
+            assertTrue(topicPoliciesOptional3.isEmpty()
+                    || topicPoliciesOptional3.get().getDispatchRate() == null);
+            assertTrue(topicPoliciesOptionalGlobal3.isEmpty()
+                    || topicPoliciesOptionalGlobal3.get().getDispatchRate() == 
null);
+        });
+    }
+
+    @Test
+    public void testDeleteGlobalPolicy() throws Exception {
+        final String tpName = BrokerTestUtil.newUniqueName("persistent://" + 
myNamespace + "/tp");
+        final String tpNameChangeEvents = "persistent://" + myNamespace + "/" 
+ NAMESPACE_EVENTS_LOCAL_NAME;
+        final String subscriptionName = "s1";
+        final int rateMsgGlobal = 1000;
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.topics().createSubscription(tpName, subscriptionName, 
MessageId.earliest);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(tpName).get().get();
+
+        // Set global policy.
+        // Verify: policies was affected.
+        DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 
1, false, 1);
+        admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(persistentTopic.getHierarchyTopicPolicies().getDispatchRate().get(),
 dispatchRateGlobal);
+        });
+
+        // Delete global policy.
+        // Verify: policies were deleted.
+        triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
+        admin.topicPolicies(true).removeDispatchRate(tpName);
+
+        Awaitility.await().untilAsserted(() -> {
+            Optional<TopicPolicies> topicPoliciesOptional = 
pulsar.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(tpName), 
false).join();
+            Optional<TopicPolicies> topicPoliciesOptionalGlobal = 
pulsar.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(tpName), true).join();
+            assertTrue(topicPoliciesOptional.isEmpty()
+                    || topicPoliciesOptional.get().getDispatchRate() == null);
+            assertTrue(topicPoliciesOptionalGlobal.isEmpty()
+                    || topicPoliciesOptionalGlobal.get().getDispatchRate() == 
null);
+        });
+
+        // cleanup.
+        admin.topics().delete(tpName, false);
+    }
+
     @Test
     public void testGlobalTopicPolicies() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index e66140efb32..f7847352dfe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.systopic;
 
-import static 
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey;
+import static 
org.apache.pulsar.broker.service.TopicPoliciesService.getEventKey;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -127,7 +127,7 @@ public class NamespaceEventsSystemTopicServiceTest extends 
MockedPulsarServiceBa
                 .policies(policies)
                 .build())
             .build();
-        systemTopicClientForNamespace1.newWriter().write(getEventKey(event), 
event);
+        systemTopicClientForNamespace1.newWriter().write(getEventKey(event, 
false), event);
         SystemTopicClient.Reader reader = 
systemTopicClientForNamespace1.newReader();
         Message<PulsarEvent> received = reader.readNext();
         log.info("Receive pulsar event from system topic : {}", 
received.getValue());

Reply via email to