This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a38f74dfefe [PIP-136] Sync Pulsar metadata across multiple clouds 
(#16425)
a38f74dfefe is described below

commit a38f74dfefec8620a16b384da428c6c4fedbdd70
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Jul 31 16:39:27 2022 -0700

    [PIP-136] Sync Pulsar metadata across multiple clouds (#16425)
    
    * [PIP-136] Sync Pulsar policies across multiple clouds
    
    add unit test
    
    add findbug
    
    * address comments
---
 conf/broker.conf                                   |   6 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  17 ++
 .../org/apache/pulsar/broker/PulsarService.java    |  35 +++-
 .../service/PulsarMetadataEventSynchronizer.java   | 199 +++++++++++++++++++++
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  30 +++-
 .../OwnerShipForCurrentServerTestBase.java         |   4 +-
 .../broker/transaction/TransactionTestBase.java    |   4 +-
 .../apache/pulsar/broker/web/WebServiceTest.java   |   4 +-
 pulsar-metadata/pom.xml                            |   1 +
 .../apache/pulsar/metadata/api/MetadataEvent.java  |  57 ++++++
 .../metadata/api/MetadataEventSynchronizer.java    |  53 ++++++
 .../pulsar/metadata/api/MetadataStoreConfig.java   |   6 +
 .../api/extended/MetadataStoreExtended.java        |  10 ++
 .../metadata/impl/AbstractMetadataStore.java       | 137 ++++++++++++--
 .../metadata/impl/LocalMemoryMetadataStore.java    |  10 ++
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |  10 ++
 .../pulsar/metadata/impl/ZKMetadataStore.java      |   8 +-
 .../batching/AbstractBatchedMetadataStore.java     |  11 ++
 .../src/main/resources/findbugsExclude.xml         |  34 ++++
 .../impl/LocalMemoryMetadataStoreTest.java         | 196 ++++++++++++++++----
 ...est.java => MetadataEventSynchronizerTest.java} |   2 +-
 site2/docs/reference-configuration-broker.md       |  19 ++
 site2/docs/reference-configuration.md              |   2 +-
 23 files changed, 795 insertions(+), 60 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 7a0e32c95b3..5216034fdb8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -29,6 +29,12 @@ metadataStoreUrl=
 # Configuration file path for metadata store. It's supported by 
RocksdbMetadataStore and EtcdMetadataStore for now
 metadataStoreConfigPath=
 
+# Event topic to sync metadata between separate pulsar clusters on different 
cloud platforms.
+metadataSyncEventTopic=
+
+# Event topic to sync configuration-metadata between separate pulsar clusters 
on different cloud platforms.
+configurationMetadataSyncEventTopic=
+
 # The metadata store URL for the configuration data. If empty, we fall back to 
use metadataStoreUrl
 configurationMetadataStoreUrl=
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4fb680cf8b4..93c79e5398e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -483,6 +483,23 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private String metadataStoreConfigPath = null;
 
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_SERVER,
+            doc = "Event topic to sync metadata between separate pulsar "
+                    + "clusters on different cloud platforms."
+    )
+    private String metadataSyncEventTopic = null;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_SERVER,
+            doc = "Event topic to sync configuration-metadata between separate 
pulsar "
+                    + "clusters on different cloud platforms."
+    )
+    private String configurationMetadataSyncEventTopic = null;
+
     @FieldContext(
             dynamic = true,
             doc = "Factory class-name to create topic with custom workflow"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5c08eb1fc08..f0d9e68f380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.rest.Topics;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
 import 
org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
@@ -256,10 +257,12 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
 
     private MetadataStoreExtended localMetadataStore;
+    private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
     private CoordinationService coordinationService;
     private TransactionBufferSnapshotService transactionBufferSnapshotService;
 
     private MetadataStore configurationMetadataStore;
+    private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
     private boolean shouldShutdownConfigurationMetadataStore;
 
     private PulsarResources pulsarResources;
@@ -350,7 +353,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 
0);
     }
 
-    public MetadataStore createConfigurationMetadataStore() throws 
MetadataStoreException {
+    public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+            throws MetadataStoreException {
         return 
MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) 
config.getMetadataStoreSessionTimeoutMillis())
@@ -360,6 +364,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
                         
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
                         
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
+                        .synchronizer(synchronizer)
                         .build());
     }
 
@@ -537,6 +542,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             closeLocalMetadataStore();
             if (configurationMetadataStore != null && 
shouldShutdownConfigurationMetadataStore) {
                 configurationMetadataStore.close();
+                if (configMetadataSynchronizer != null) {
+                    configMetadataSynchronizer.close();
+                    configMetadataSynchronizer = null;
+                }
             }
 
             if (transactionExecutorProvider != null) {
@@ -695,13 +704,19 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         + "[loadBalancerOverrideBrokerNicSpeedGbps] to 
override it when load balancer is enabled.");
             }
 
-            localMetadataStore = createLocalMetadataStore();
+            localMetadataSynchronizer = 
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
+                    ? new PulsarMetadataEventSynchronizer(this, 
config.getMetadataSyncEventTopic())
+                    : null;
+            localMetadataStore = 
createLocalMetadataStore(localMetadataSynchronizer);
             
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
 
             coordinationService = new 
CoordinationServiceImpl(localMetadataStore);
 
             if (config.isConfigurationStoreSeparated()) {
-                configurationMetadataStore = 
createConfigurationMetadataStore();
+                configMetadataSynchronizer = 
StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
+                        ? new PulsarMetadataEventSynchronizer(this, 
config.getConfigurationMetadataSyncEventTopic())
+                        : null;
+                configurationMetadataStore = 
createConfigurationMetadataStore(configMetadataSynchronizer);
                 shouldShutdownConfigurationMetadataStore = true;
             } else {
                 configurationMetadataStore = localMetadataStore;
@@ -857,6 +872,12 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 this.resourceUsageTransportManager = 
(ResourceUsageTopicTransportManager) object;
             }
             this.resourceGroupServiceManager = new ResourceGroupService(this);
+            if (localMetadataSynchronizer != null) {
+                localMetadataSynchronizer.start();
+            }
+            if (configMetadataSynchronizer != null) {
+                configMetadataSynchronizer.start();
+            }
 
             long currentTimestamp = System.currentTimeMillis();
             final long bootstrapTimeSeconds = 
TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - startTimestamp);
@@ -1012,7 +1033,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         }
     }
 
-    public MetadataStoreExtended createLocalMetadataStore() throws 
MetadataStoreException {
+    public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+            throws MetadataStoreException, PulsarServerException {
         return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) 
config.getMetadataStoreSessionTimeoutMillis())
@@ -1022,6 +1044,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
                         
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
                         
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
+                        .synchronizer(synchronizer)
                         .build());
     }
 
@@ -1029,6 +1052,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         if (localMetadataStore != null) {
             localMetadataStore.close();
         }
+        if (localMetadataSynchronizer != null) {
+            localMetadataSynchronizer.close();
+            localMetadataSynchronizer = null;
+        }
     }
 
     protected void startLeaderElectionService() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
new file mode 100644
index 00000000000..234097ee7ca
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronizer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
+    protected PulsarService pulsar;
+    protected BrokerService brokerService;
+    protected String topicName;
+    protected PulsarClientImpl client;
+    protected volatile Producer<MetadataEvent> producer;
+    protected volatile Consumer<MetadataEvent> consumer;
+    private final CopyOnWriteArrayList<Function<MetadataEvent, 
CompletableFuture<Void>>>
+    listeners = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+    public static final String SUBSCRIPTION_NAME = "metadata-syncer";
+    private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
+    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 
1, TimeUnit.MINUTES, 0,
+            TimeUnit.MILLISECONDS);
+
+    public PulsarMetadataEventSynchronizer(PulsarService pulsar, String 
topicName) throws PulsarServerException {
+        this.pulsar = pulsar;
+        this.brokerService = pulsar.getBrokerService();
+        this.topicName = topicName;
+        if (!StringUtils.isNotBlank(topicName)) {
+            log.info("Metadata synchronizer is disabled");
+            return;
+        }
+    }
+
+    public void start() throws PulsarServerException {
+        if (StringUtils.isBlank(topicName)) {
+            log.info("metadata topic doesn't exist.. skipping metadata 
synchronizer init..");
+            return;
+        }
+        this.client = (PulsarClientImpl) pulsar.getClient();
+        startProducer();
+        startConsumer();
+        log.info("Metadata event synchronizer started on topic {}", topicName);
+    }
+
+    @Override
+    public CompletableFuture<Void> notify(MetadataEvent event) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        publishAsync(event, future);
+        return future;
+    }
+
+    @Override
+    public void registerSyncListener(Function<MetadataEvent, 
CompletableFuture<Void>> listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public String getClusterName() {
+        return pulsar.getConfig().getClusterName();
+    }
+
+    private void publishAsync(MetadataEvent event, CompletableFuture<Void> 
future) {
+        if (!started) {
+            log.info("Producer is not started on {}, failed to publish {}", 
topicName, event);
+            future.completeExceptionally(new IllegalStateException("producer 
is not started yet"));
+        }
+        producer.newMessage().value(event).sendAsync().thenAccept(__ -> {
+            log.info("successfully published metadata change event {}", event);
+            future.complete(null);
+        }).exceptionally(ex -> {
+            log.warn("failed to publish metadata update {}, will retry in {}", 
topicName, MESSAGE_RATE_BACKOFF_MS, ex);
+            pulsar.getBrokerService().executor().schedule(() -> 
publishAsync(event, future), MESSAGE_RATE_BACKOFF_MS,
+                    TimeUnit.MILLISECONDS);
+            return null;
+        });
+    }
+
+    private void startProducer() {
+        log.info("[{}] Starting producer", topicName);
+        client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(topicName)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
+                .sendTimeout(0, TimeUnit.SECONDS) //
+                
.maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod -> 
{
+                    producer = prod;
+                    started = true;
+                    log.info("producer is created successfully {}", topicName);
+                }).exceptionally(ex -> {
+                    long waitTimeMs = backOff.next();
+                    log.warn("[{}] Failed to create producer ({}), retrying in 
{} s", topicName, ex.getMessage(),
+                            waitTimeMs / 1000.0);
+                    // BackOff before retrying
+                    brokerService.executor().schedule(this::startProducer, 
waitTimeMs, TimeUnit.MILLISECONDS);
+                    return null;
+                });
+    }
+
+    private void startConsumer() {
+        if (consumer != null) {
+            return;
+        }
+        ConsumerBuilder<MetadataEvent> consumerBuilder = 
client.newConsumer(Schema.AVRO(MetadataEvent.class))
+                
.topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60, 
TimeUnit.SECONDS)
+                
.subscriptionType(SubscriptionType.Failover).messageListener((c, msg) -> {
+                    log.info("Processing metadata event for {} with listeners 
{}", msg.getValue().getPath(),
+                            listeners.size());
+                    try {
+                        if (listeners.size() == 0) {
+                            c.acknowledgeAsync(msg);
+                            return;
+
+                        }
+                        if (listeners.size() == 1) {
+                            
listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))
+                                    .exceptionally(ex -> {
+                                        log.warn("Failed to synchronize {} for 
{}", msg.getMessageId(), topicName,
+                                                ex.getCause());
+                                        return null;
+                                    });
+                        } else {
+                            FutureUtil
+                                    
.waitForAll(listeners.stream().map(listener -> listener.apply(msg.getValue()))
+                                            .collect(Collectors.toList()))
+                                    .thenApply(__ -> 
c.acknowledgeAsync(msg)).exceptionally(ex -> {
+                                        log.warn("Failed to synchronize {} for 
{}", msg.getMessageId(), topicName);
+                                        return null;
+                                    });
+                        }
+                    } catch (Exception e) {
+                        log.warn("Failed to synchronize {} for {}", 
msg.getMessageId(), topicName);
+                    }
+                });
+        consumerBuilder.subscribeAsync().thenAccept(consumer -> {
+            log.info("successfully created consumer {}", topicName);
+            this.consumer = consumer;
+        }).exceptionally(ex -> {
+            long waitTimeMs = backOff.next();
+            log.warn("[{}] Failed to create consumer ({}), retrying in {} s", 
topicName, ex.getMessage(),
+                    waitTimeMs / 1000.0);
+            // BackOff before retrying
+            brokerService.executor().schedule(this::startConsumer, waitTimeMs, 
TimeUnit.MILLISECONDS);
+            return null;
+        });
+    }
+
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public void close() {
+        started = false;
+        if (producer != null) {
+            producer.closeAsync();
+            producer = null;
+        }
+        if (consumer != null) {
+            consumer.closeAsync();
+            consumer = null;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index dd75f6eab3f..2ca1e1f986d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.auth;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -51,11 +52,13 @@ import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -65,6 +68,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -348,8 +352,20 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
     protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
         // Override default providers with mocked ones
         
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
-        
doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore();
-        
doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore();
+        
+        PulsarMetadataEventSynchronizer synchronizer = StringUtils
+                .isNotBlank(pulsar.getConfig().getMetadataSyncEventTopic())
+                        ? new PulsarMetadataEventSynchronizer(pulsar, 
pulsar.getConfig().getMetadataSyncEventTopic())
+                        : null;
+        PulsarMetadataEventSynchronizer configSynchronizer = StringUtils
+                
.isNotBlank(pulsar.getConfig().getConfigurationMetadataSyncEventTopic())
+                        ? new PulsarMetadataEventSynchronizer(pulsar,
+                                
pulsar.getConfig().getConfigurationMetadataSyncEventTopic())
+                        : null;
+        doReturn(synchronizer != null ? createLocalMetadataStore(synchronizer) 
: createLocalMetadataStore())
+                .when(pulsar).createLocalMetadataStore(any());
+        doReturn(configSynchronizer != null ? 
createConfigurationMetadataStore(configSynchronizer)
+                : 
createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore(any());
 
         Supplier<NamespaceService> namespaceServiceSupplier = () -> 
spyWithClassAndConstructorArgs(NamespaceService.class, pulsar);
         
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
@@ -363,10 +379,20 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         }
     }
 
+    protected MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
+        return new ZKMetadataStore(mockZooKeeper, 
MetadataStoreConfig.builder().synchronizer(synchronizer).build());
+    }
+
     protected MetadataStoreExtended createLocalMetadataStore() throws 
MetadataStoreException {
         return new ZKMetadataStore(mockZooKeeper);
     }
 
+    protected MetadataStoreExtended 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
+        return new ZKMetadataStore(mockZooKeeperGlobal,
+                
MetadataStoreConfig.builder().synchronizer(synchronizer).build());
+
+    }
+
     protected MetadataStoreExtended createConfigurationMetadataStore() throws 
MetadataStoreException {
         return new ZKMetadataStore(mockZooKeeperGlobal);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index 0a50ede60ae..79183b358b8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -129,8 +129,8 @@ public class OwnerShipForCurrentServerTestBase {
         // Override default providers with mocked ones
         
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
         MockZooKeeperSession mockZooKeeperSession = 
MockZooKeeperSession.newInstance(mockZooKeeper);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore();
-        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore();
+        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore(null);
+        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore(null);
         Supplier<NamespaceService> namespaceServiceSupplier = () -> 
spyWithClassAndConstructorArgs(NamespaceService.class, pulsar);
         
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 0e8d369652b..33a995fae99 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -192,8 +192,8 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
         
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
 
         MockZooKeeperSession mockZooKeeperSession = 
MockZooKeeperSession.newInstance(mockZooKeeper);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore();
-        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore();
+        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore(null);
+        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore(null);
         Supplier<NamespaceService> namespaceServiceSupplier = () -> 
spyWithClassAndConstructorArgs(NamespaceService.class, pulsar);
         
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 0bf0878eb01..9f631e38542 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -424,8 +424,8 @@ public class WebServiceTest {
         pulsar = spyWithClassAndConstructorArgs(PulsarService.class, config);
      // mock zk
         MockZooKeeper mockZooKeeper = 
MockedPulsarServiceBaseTest.createMockZooKeeper();
-        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
-        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
+        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore(null);
+        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore(null);
         doReturn(new 
MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
         pulsar.start();
 
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index bd6e425d91b..935fc878a62 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -133,6 +133,7 @@
         <version>${spotbugs-maven-plugin.version}</version>
         <configuration>
           
<excludeFilterFile>${basedir}/src/test/resources/findbugsExclude.xml</excludeFilterFile>
+          
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
         </configuration>
         <executions>
           <execution>
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEvent.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEvent.java
new file mode 100644
index 00000000000..8682276a176
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEvent.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.HashSet;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+
+/**
+ * Metadata event used by {@link MetadataEventSynchronizer}.
+ *
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString
+public class MetadataEvent {
+    private String path;
+    private byte[] value;
+    private HashSet<CreateOption> options;
+    private Long expectedVersion;
+    private long lastUpdatedTimestamp;
+    private String sourceCluster;
+    private NotificationType type;
+    private int version;
+    public MetadataEvent(String path, byte[] value, HashSet<CreateOption> 
options, Long expectedVersion,
+            long lastUpdatedTimestamp, String sourceCluster, NotificationType 
type) {
+        super();
+        this.path = path;
+        this.value = value;
+        this.options = options;
+        this.expectedVersion = expectedVersion;
+        this.lastUpdatedTimestamp = lastUpdatedTimestamp;
+        this.sourceCluster = sourceCluster;
+        this.type = type;
+    }
+
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
new file mode 100644
index 00000000000..b2aa19edf51
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Metadata synchronizer to notify and synchronize metadata change events.
+ */
+public interface MetadataEventSynchronizer {
+
+    /**
+     * Notify metadata change event.
+     * @param event
+     *            metadata change event.
+     * @return
+     */
+    CompletableFuture<Void> notify(MetadataEvent event);
+
+    /**
+     * Register notification listener to sync metadata event in local cluster.
+     * @param event
+     */
+    void registerSyncListener(Function<MetadataEvent, CompletableFuture<Void>> 
event);
+
+    /**
+     * Name of current cluster served by the Synchronizer.
+     * @return clusterName
+     */
+    String getClusterName();
+
+    /**
+     * close synchronizer resources.
+     */
+    void close();
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index f00a0ab8d77..e1d0ede7d2a 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -71,4 +71,10 @@ public class MetadataStoreConfig {
      */
     @Builder.Default
     private final int batchingMaxSizeKb = 128;
+
+    /**
+     * Pluggable MetadataEventSynchronizer to sync metadata events across the
+     * separate clusters.
+     */
+    private MetadataEventSynchronizer synchronizer;
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index 4d97f80ee6d..831ef73dacd 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -22,6 +22,7 @@ import java.util.EnumSet;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -72,4 +73,13 @@ public interface MetadataStoreExtended extends MetadataStore 
{
      *            the session listener
      */
     void registerSessionListener(Consumer<SessionEvent> listener);
+
+    /**
+     * Get {@link MetadataEventSynchronizer} to notify and synchronize 
metadata events.
+     *
+     * @return
+     */
+    default Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() 
{
+        return Optional.empty();
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index a116ea9c294..8986811ad9f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -25,9 +25,13 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Instant;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -43,6 +47,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
@@ -122,6 +128,89 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                 });
     }
 
+    protected void registerSyncLister(Optional<MetadataEventSynchronizer> 
synchronizer) {
+        if (synchronizer.isPresent()) {
+            synchronizer.get().registerSyncListener(event -> {
+                CompletableFuture<Void> result = new CompletableFuture<>();
+                get(event.getPath()).thenApply(res -> {
+                    Set<CreateOption> options = event.getOptions() != null ? 
event.getOptions()
+                            : Collections.emptySet();
+                    if (res.isPresent()) {
+                        GetResult existingValue = res.get();
+                        if (shouldIgnoreEvent(event, existingValue)) {
+                            result.complete(null);
+                            return result;
+                        }
+                    }
+                    // else update the event
+                    CompletableFuture<?> updateResult = (event.getType() == 
NotificationType.Deleted)
+                            ? deleteInternal(event.getPath(), 
Optional.ofNullable(event.getExpectedVersion()))
+                            : putInternal(event.getPath(), event.getValue(),
+                                    
Optional.ofNullable(event.getExpectedVersion()), options);
+                    updateResult.thenApply(stat -> {
+                        if (log.isDebugEnabled()) {
+                            log.debug("successfully updated {}", 
event.getPath());
+                        }
+                        return result.complete(null);
+                    }).exceptionally(ex -> {
+                        log.warn("Failed to update metadata {}", 
event.getPath(), ex.getCause());
+                        if (ex.getCause() instanceof 
MetadataStoreException.BadVersionException) {
+                            result.complete(null);
+                        } else {
+                            result.completeExceptionally(ex);
+                        }
+                        return false;
+                    });
+                    return result;
+                });
+                return result;
+            });
+        }
+    }
+
+    @VisibleForTesting
+    protected boolean shouldIgnoreEvent(MetadataEvent event, GetResult 
existingValue) {
+        long existingVersion = existingValue.getStat() != null ? 
existingValue.getStat().getVersion() : -1;
+        long existingTimestamp = existingValue.getStat() != null ? 
existingValue.getStat().getModificationTimestamp()
+                : -1;
+        String sourceClusterName = event.getSourceCluster();
+        Set<CreateOption> options = event.getOptions() != null ? 
event.getOptions()
+                : Collections.emptySet();
+        String currentClusterName = 
getMetadataEventSynchronizer().get().getClusterName();
+        // ignore event from the unknown cluster
+        if (sourceClusterName == null || currentClusterName == null) {
+            return true;
+        }
+        // ignore event if metadata is ephemeral or
+        // sequential
+        if (options.contains(CreateOption.Ephemeral) || 
event.getOptions().contains(CreateOption.Sequential)) {
+            return true;
+        }
+        // ignore the event if event occurred before the
+        // existing update
+        if (event.getLastUpdatedTimestamp() < existingTimestamp) {
+            return true;
+        }
+        if (currentClusterName.equals(sourceClusterName)) {
+            // if expected version doesn't exist then ignore the
+            // event
+            if ((event.getExpectedVersion() != null && 
event.getExpectedVersion() > 0)
+                    && event.getExpectedVersion() != existingVersion) {
+                return true;
+            }
+
+        } else if ((event.getLastUpdatedTimestamp() == existingTimestamp
+                && sourceClusterName.compareTo(currentClusterName) < 0)) {
+            // ignore: if event is older than existing store
+            // metadata
+            // or if timestamp is same for both the event then
+            // larger cluster-name according to lexical sorting
+            // should win for the update.
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
         MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
@@ -227,17 +316,28 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
+        if (getMetadataEventSynchronizer().isPresent()) {
+            MetadataEvent event = new MetadataEvent(path, null, new 
HashSet<>(),
+                    expectedVersion.orElse(null), Instant.now().toEpochMilli(),
+                    getMetadataEventSynchronizer().get().getClusterName(), 
NotificationType.Deleted);
+            return getMetadataEventSynchronizer().get().notify(event)
+                    .thenCompose(__ -> deleteInternal(path, expectedVersion));
+        } else {
+            return deleteInternal(path, expectedVersion);
+        }
+    }
+
+    private CompletableFuture<Void> deleteInternal(String path, Optional<Long> 
expectedVersion) {
         // Ensure caches are invalidated before the operation is confirmed
-        return storeDelete(path, expectedVersion)
-                .thenRun(() -> {
-                    existsCache.synchronous().invalidate(path);
-                    String parent = parent(path);
-                    if (parent != null) {
-                        childrenCache.synchronous().invalidate(parent);
-                    }
+        return storeDelete(path, expectedVersion).thenRun(() -> {
+            existsCache.synchronous().invalidate(path);
+            String parent = parent(path);
+            if (parent != null) {
+                childrenCache.synchronous().invalidate(parent);
+            }
 
-                    metadataCaches.forEach(c -> c.invalidate(path));
-                });
+            metadataCaches.forEach(c -> c.invalidate(path));
+        });
     }
 
     @Override
@@ -266,8 +366,25 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
+        HashSet<CreateOption> ops = new HashSet<>(options);
+        if (getMetadataEventSynchronizer().isPresent()) {
+            Long version = optExpectedVersion.isPresent() && 
optExpectedVersion.get() < 0 ? null
+                    : optExpectedVersion.orElse(null);
+            MetadataEvent event = new MetadataEvent(path, data, ops, version,
+                    Instant.now().toEpochMilli(), 
getMetadataEventSynchronizer().get().getClusterName(),
+                    NotificationType.Modified);
+            return getMetadataEventSynchronizer().get().notify(event)
+                    .thenCompose(__ -> putInternal(path, data, 
optExpectedVersion, options));
+        } else {
+            return putInternal(path, data, optExpectedVersion, options);
+        }
+
+    }
+    public final CompletableFuture<Stat> putInternal(String path, byte[] data, 
Optional<Long> optExpectedVersion,
+            Set<CreateOption> options) {
         // Ensure caches are invalidated before the operation is confirmed
-        return storePut(path, data, optExpectedVersion, options)
+        return storePut(path, data, optExpectedVersion,
+                (options != null && !options.isEmpty()) ? 
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class))
                 .thenApply(stat -> {
                     NotificationType type = stat.getVersion() == 0 ? 
NotificationType.Created
                             : NotificationType.Modified;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 63efba8f724..924a6ac5d6d 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -62,6 +63,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
 
     private final NavigableMap<String, Value> map;
     private final AtomicLong sequentialIdGenerator;
+    private MetadataEventSynchronizer synchronizer;
 
     private static final Map<String, NavigableMap<String, Value>> STATIC_MAPS 
= new MapMaker()
             .weakValues().makeMap();
@@ -75,6 +77,9 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
             throws MetadataStoreException {
         String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
         // Local means a private data set
+        // update synchronizer and register sync listener
+        synchronizer = metadataStoreConfig.getSynchronizer();
+        registerSyncLister(Optional.ofNullable(synchronizer));
         if ("local".equals(name)) {
             map = new TreeMap<>();
             sequentialIdGenerator = new AtomicLong();
@@ -218,4 +223,9 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
             }
         }
     }
+
+    @Override
+    public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
+        return Optional.ofNullable(synchronizer);
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 2d4c0a00bf0..69af0e76914 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -47,6 +47,7 @@ import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
@@ -91,6 +92,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     private final WriteOptions optionDontSync;
     private final ReadOptions optionCache;
     private final ReadOptions optionDontCache;
+    private MetadataEventSynchronizer synchronizer;
 
     enum State {
         RUNNING, CLOSED
@@ -115,6 +117,9 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
 
         // Create a new store instance
         store = new RocksdbMetadataStore(metadataStoreUri, conf);
+        // update synchronizer and register sync listener
+        store.synchronizer = conf.getSynchronizer();
+        store.registerSyncLister(Optional.ofNullable(store.synchronizer));
         instancesCache.put(metadataStoreUri, store);
         return store;
     }
@@ -589,4 +594,9 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
             dbStateLock.readLock().unlock();
         }
     }
+
+    @Override
+    public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
+        return Optional.ofNullable(synchronizer);
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index ad723f28f89..87b6be283fc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -112,7 +112,13 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
     @VisibleForTesting
     @SneakyThrows
     public ZKMetadataStore(ZooKeeper zkc) {
-        super(MetadataStoreConfig.builder().build());
+        this(zkc, MetadataStoreConfig.builder().build());
+    }
+
+    @VisibleForTesting
+    @SneakyThrows
+    public ZKMetadataStore(ZooKeeper zkc, MetadataStoreConfig config) {
+        super(config);
 
         this.zkConnectString = null;
         this.metadataStoreConfig = null;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 616cac289ef..c9d245b8caf 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
@@ -49,6 +50,7 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private final int maxDelayMillis;
     private final int maxOperations;
     private final int maxSize;
+    private MetadataEventSynchronizer synchronizer;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
         super();
@@ -68,6 +70,10 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
             readOps = null;
             writeOps = null;
         }
+
+        // update synchronizer and register sync listener
+        synchronizer = conf.getSynchronizer();
+        registerSyncLister(Optional.ofNullable(synchronizer));
     }
 
     @Override
@@ -143,6 +149,11 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
         return op.getFuture();
     }
 
+    @Override
+    public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
+        return Optional.ofNullable(synchronizer);
+    }
+
     private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) 
{
         if (enabled) {
             if (!queue.offer(op)) {
diff --git a/pulsar-metadata/src/main/resources/findbugsExclude.xml 
b/pulsar-metadata/src/main/resources/findbugsExclude.xml
new file mode 100644
index 00000000000..21578b9f8cb
--- /dev/null
+++ b/pulsar-metadata/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,34 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<FindBugsFilter>
+    <Match>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
+    <Match>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
+    <Match>
+        <Bug pattern="MS_EXPOSE_REP"/>
+    </Match>
+    <Match>
+        <Bug pattern="UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD"/>
+    </Match>
+</FindBugsFilter>
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
index af6a94d9c7e..1df95076033 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
@@ -18,71 +18,197 @@
  */
 package org.apache.pulsar.metadata.impl;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
-import lombok.Cleanup;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.awaitility.Awaitility;
+import org.testcontainers.shaded.com.google.common.collect.Sets;
 import org.testng.annotations.Test;
 
+import lombok.Cleanup;
+
 public class LocalMemoryMetadataStoreTest {
 
+    HashSet<CreateOption> EMPTY_SET = new HashSet<>();
     @Test
-    public void testPrivateInstance() throws Exception {
+    public void testNotifyEvent() throws Exception {
+        TestMetadataEventSynchronizer sync = new 
TestMetadataEventSynchronizer();
         @Cleanup
         MetadataStore store1 = MetadataStoreFactory.create("memory:local",
-                MetadataStoreConfig.builder().build());
+                MetadataStoreConfig.builder().synchronizer(sync).build());
 
-        @Cleanup
-        MetadataStore store2 = MetadataStoreFactory.create("memory:local",
-                MetadataStoreConfig.builder().build());
+        String path = "/test";
+        byte[] value = "value".getBytes(StandardCharsets.UTF_8);
+        store1.put(path, value, Optional.empty()).join();
 
-        store1.put("/test", "value".getBytes(StandardCharsets.UTF_8), 
Optional.empty()).join();
+        assertTrue(store1.exists(path).join());
+        MetadataEvent event = sync.notifiedEvents.get(path);
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> event != 
null);
+        assertNotNull(event);
+        assertEquals(event.getPath(), path);
+        assertEquals(event.getValue(), value);
+        assertEquals(event.getOptions(), EMPTY_SET);
+        assertEquals(event.getType(), NotificationType.Modified);
+        assertEquals(event.getSourceCluster(), sync.clusterName);
+        assertNull(event.getExpectedVersion());
 
-        assertTrue(store1.exists("/test").join());
-        assertFalse(store2.exists("/test").join());
+        // (2) with expected version
+        long exptectedVersion = 0L;
+        for (; exptectedVersion < 4; exptectedVersion++) {
+            sync.notifiedEvents.remove(path);
+            store1.put(path, value, Optional.of(exptectedVersion)).join();
+            MetadataEvent event2 = sync.notifiedEvents.get(path);
+            Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> event2 
!= null);
+            assertNotNull(event2);
+            assertEquals(event2.getPath(), path);
+            assertEquals((long) event2.getExpectedVersion(), exptectedVersion);
+            assertEquals(event2.getType(), NotificationType.Modified);
+        }
+        
+        // (3) delete node
+        sync.notifiedEvents.remove(path);
+        store1.delete(path, Optional.of(exptectedVersion)).join();
+        MetadataEvent event2 = sync.notifiedEvents.get(path);
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> event2 != 
null);
+        assertNotNull(event2);
+        assertEquals(event2.getPath(), path);
+        assertEquals((long) event2.getExpectedVersion(), exptectedVersion);
+        assertEquals(event2.getType(), NotificationType.Deleted);
+        assertEquals(event2.getSourceCluster(), sync.clusterName);
+        assertEquals(event2.getOptions(), EMPTY_SET);
     }
 
     @Test
-    public void testSharedInstance() throws Exception {
-        String url = "memory:" + UUID.randomUUID();
+    public void testIsIgnoreEvent() throws Exception {
 
+        TestMetadataEventSynchronizer sync = new 
TestMetadataEventSynchronizer();
         @Cleanup
-        MetadataStore store1 = MetadataStoreFactory.create(url,
-                MetadataStoreConfig.builder().build());
+        AbstractMetadataStore store1 = (AbstractMetadataStore) 
MetadataStoreFactory.create("memory:local",
+                MetadataStoreConfig.builder().synchronizer(sync).build());
+
+        String path = "/test";
+        byte[] value1 = "value1".getBytes(StandardCharsets.UTF_8);
+        byte[] value2 = "value2".getBytes(StandardCharsets.UTF_8);
+        store1.put(path, value1, Optional.empty()).join();
+
+        long time1 = Instant.now().toEpochMilli();
+        long time2 = time1 -5;
+        Stat stats = new Stat(path, 0, time2, time2, false, false);
+        GetResult eixistingData = new GetResult(value1, stats);
+        // (1) ignore due to Ephemeral node
+        MetadataEvent event = new MetadataEvent(path, value1, 
Sets.newHashSet(CreateOption.Ephemeral), 0L,
+                time1, sync.getClusterName(), NotificationType.Modified);
+        assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+        // (2) ignore due to invalid expected version
+        event = new MetadataEvent(path, value1, EMPTY_SET, 
10L/*invalid-version*/,
+                time1, sync.getClusterName(), NotificationType.Modified);
+        assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+        // (3) accept with valid conditions
+        event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+                time1, sync.getClusterName(), NotificationType.Modified);
+        assertFalse(store1.shouldIgnoreEvent(event, eixistingData));
+        // (4) Ignore due to invalid cluster name
+        event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+                time1, null, NotificationType.Modified);
+        assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+        // (5) consider due to same timestamp and correct expected version on 
the same cluster
+        event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+                time2, sync.getClusterName(), NotificationType.Modified);
+        assertFalse(store1.shouldIgnoreEvent(event, eixistingData));
+        // (6) Ignore due to same timestamp but different expected version on 
the same cluster
+        event = new MetadataEvent(path, value1, EMPTY_SET, 10L,
+                time2, sync.getClusterName(), NotificationType.Modified);
+        assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+        // (7) consider due to same timestamp but expected version=-1 on the 
same cluster
+        event = new MetadataEvent(path, value1, EMPTY_SET, null,
+                time2, sync.getClusterName(), NotificationType.Modified);
+        assertFalse(store1.shouldIgnoreEvent(event, eixistingData));
+        // (8) Ignore due to less timestamp on the same cluster
+        event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+                time2-5, sync.getClusterName(), NotificationType.Modified);
+        assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+        // (9) consider "uest" > "test" and same timestamp
+        event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+                time2, "uest", NotificationType.Modified);
+        assertFalse(store1.shouldIgnoreEvent(event, eixistingData));
+        // (10) ignore "uest" > "test" and less timestamp
+        event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+                time2-5, "uest", NotificationType.Modified);
+        assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+        // (11) ignore "rest" < "test" and same timestamp
+        event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+                time2, "rest", NotificationType.Modified);
+        assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+    }
 
+    @Test
+    public void testSyncListener() throws Exception {
+        TestMetadataEventSynchronizer sync = new 
TestMetadataEventSynchronizer();
         @Cleanup
-        MetadataStore store2 = MetadataStoreFactory.create(url,
-                MetadataStoreConfig.builder().build());
+        MetadataStore store1 = MetadataStoreFactory.create("memory:local",
+                MetadataStoreConfig.builder().synchronizer(sync).build());
+
+        String path = "/test";
+        byte[] value1 = "value1".getBytes(StandardCharsets.UTF_8);
+        byte[] value2 = "value2".getBytes(StandardCharsets.UTF_8);
+        store1.put(path, value1, Optional.empty()).join();
 
-        store1.put("/test", "value".getBytes(StandardCharsets.UTF_8), 
Optional.empty()).join();
+        assertTrue(store1.exists(path).join());
+        
+        Stat stats = store1.get(path).get().get().getStat();
+        MetadataEvent event = new MetadataEvent(path, value2, EMPTY_SET, 
stats.getVersion(),
+                stats.getModificationTimestamp() + 1, sync.clusterName, 
NotificationType.Modified);
+        sync.listener.apply(event).get();
+        assertEquals(store1.get(path).get().get().getValue(), value2);
+    }
 
-        assertTrue(store1.exists("/test").join());
-        assertTrue(store2.exists("/test").join());
+    static class TestMetadataEventSynchronizer implements 
MetadataEventSynchronizer {
+        public Map<String, MetadataEvent> notifiedEvents = new 
ConcurrentHashMap<>();
+        public String clusterName = "test";
+        public volatile Function<MetadataEvent, CompletableFuture<Void>> 
listener;
 
-        store2.delete("/test", Optional.empty()).join();
+        @Override
+        public CompletableFuture<Void> notify(MetadataEvent event) {
+            notifiedEvents.put(event.getPath(), event);
+            return CompletableFuture.completedFuture(null);
+        }
 
-        assertFalse(store2.exists("/test").join());
+        @Override
+        public void registerSyncListener(Function<MetadataEvent, 
CompletableFuture<Void>> fun) {
+            this.listener = fun;
+        }
 
-        // The exists will be updated based on the cache invalidation in store1
-        Awaitility.await().untilAsserted(() -> {
-            assertFalse(store1.exists("/test").join());
-        });
-    }
+        @Override
+        public String getClusterName() {
+            return clusterName;
+        }
+
+        @Override
+        public void close() {
+            // No-op
+        }
 
-    @Test
-    public void testPathValid() {
-        assertFalse(AbstractMetadataStore.isValidPath(null));
-        assertFalse(AbstractMetadataStore.isValidPath(""));
-        assertFalse(AbstractMetadataStore.isValidPath(" "));
-        assertTrue(AbstractMetadataStore.isValidPath("/"));
-        assertTrue(AbstractMetadataStore.isValidPath("/test"));
-        assertFalse(AbstractMetadataStore.isValidPath("/test/"));
-        assertTrue(AbstractMetadataStore.isValidPath("/test/ABC"));
     }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataEventSynchronizerTest.java
similarity index 98%
copy from 
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
copy to 
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataEventSynchronizerTest.java
index af6a94d9c7e..df5f9000157 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataEventSynchronizerTest.java
@@ -30,7 +30,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
-public class LocalMemoryMetadataStoreTest {
+public class MetadataEventSynchronizerTest {
 
     @Test
     public void testPrivateInstance() throws Exception {
diff --git a/site2/docs/reference-configuration-broker.md 
b/site2/docs/reference-configuration-broker.md
index d2f24d1d901..1b442cfe73b 100644
--- a/site2/docs/reference-configuration-broker.md
+++ b/site2/docs/reference-configuration-broker.md
@@ -2383,6 +2383,25 @@ Configuration file path for local metadata store. It's 
supported by RocksdbMetad
 
 **Category**: Server
 
+### metadataSyncEventTopic
+Event topic to sync metadata between separate pulsar clusters on different 
cloud platforms.
+
+**Default**: `null`
+
+**Dynamic**: `true`
+
+**Category**: Server
+
+
+### configurationMetadataSyncEventTopic
+Event topic to sync configuration-metadata between separate pulsar clusters on 
different cloud platforms.
+
+**Default**: `null`
+
+**Dynamic**: `true`
+
+**Category**: Server
+
 ### metadataStoreOperationTimeoutSeconds
 Metadata store operation timeout in seconds.
 
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index 0ea287c89b5..7b082fcbf56 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -14,4 +14,4 @@ You can manage Pulsar configuration by configuration files in 
the [`conf`](https
 - [Standalone](./reference-configuration-standalone.md)
 - [WebSocket](./reference-configuration-websocket.md)
 - [Pulsar proxy](./reference-configuration-pulsar-proxy.md)
-- [ZooKeeper](./reference-configuration-zookeeper.md)
+- [ZooKeeper](./reference-configuration-zookeeper.md)
\ No newline at end of file

Reply via email to