This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a38f74dfefe [PIP-136] Sync Pulsar metadata across multiple clouds
(#16425)
a38f74dfefe is described below
commit a38f74dfefec8620a16b384da428c6c4fedbdd70
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Jul 31 16:39:27 2022 -0700
[PIP-136] Sync Pulsar metadata across multiple clouds (#16425)
* [PIP-136] Sync Pulsar policies across multiple clouds
add unit test
add findbug
* address comments
---
conf/broker.conf | 6 +
.../apache/pulsar/broker/ServiceConfiguration.java | 17 ++
.../org/apache/pulsar/broker/PulsarService.java | 35 +++-
.../service/PulsarMetadataEventSynchronizer.java | 199 +++++++++++++++++++++
.../broker/auth/MockedPulsarServiceBaseTest.java | 30 +++-
.../OwnerShipForCurrentServerTestBase.java | 4 +-
.../broker/transaction/TransactionTestBase.java | 4 +-
.../apache/pulsar/broker/web/WebServiceTest.java | 4 +-
pulsar-metadata/pom.xml | 1 +
.../apache/pulsar/metadata/api/MetadataEvent.java | 57 ++++++
.../metadata/api/MetadataEventSynchronizer.java | 53 ++++++
.../pulsar/metadata/api/MetadataStoreConfig.java | 6 +
.../api/extended/MetadataStoreExtended.java | 10 ++
.../metadata/impl/AbstractMetadataStore.java | 137 ++++++++++++--
.../metadata/impl/LocalMemoryMetadataStore.java | 10 ++
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 10 ++
.../pulsar/metadata/impl/ZKMetadataStore.java | 8 +-
.../batching/AbstractBatchedMetadataStore.java | 11 ++
.../src/main/resources/findbugsExclude.xml | 34 ++++
.../impl/LocalMemoryMetadataStoreTest.java | 196 ++++++++++++++++----
...est.java => MetadataEventSynchronizerTest.java} | 2 +-
site2/docs/reference-configuration-broker.md | 19 ++
site2/docs/reference-configuration.md | 2 +-
23 files changed, 795 insertions(+), 60 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 7a0e32c95b3..5216034fdb8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -29,6 +29,12 @@ metadataStoreUrl=
# Configuration file path for metadata store. It's supported by
RocksdbMetadataStore and EtcdMetadataStore for now
metadataStoreConfigPath=
+# Event topic to sync metadata between separate pulsar clusters on different
cloud platforms.
+metadataSyncEventTopic=
+
+# Event topic to sync configuration-metadata between separate pulsar clusters
on different cloud platforms.
+configurationMetadataSyncEventTopic=
+
# The metadata store URL for the configuration data. If empty, we fall back to
use metadataStoreUrl
configurationMetadataStoreUrl=
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4fb680cf8b4..93c79e5398e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -483,6 +483,23 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private String metadataStoreConfigPath = null;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "Event topic to sync metadata between separate pulsar "
+ + "clusters on different cloud platforms."
+ )
+ private String metadataSyncEventTopic = null;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "Event topic to sync configuration-metadata between separate
pulsar "
+ + "clusters on different cloud platforms."
+ )
+ private String configurationMetadataSyncEventTopic = null;
+
@FieldContext(
dynamic = true,
doc = "Factory class-name to create topic with custom workflow"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5c08eb1fc08..f0d9e68f380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import
org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
@@ -256,10 +257,12 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
private MetadataStoreExtended localMetadataStore;
+ private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotService transactionBufferSnapshotService;
private MetadataStore configurationMetadataStore;
+ private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
private PulsarResources pulsarResources;
@@ -350,7 +353,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.offloaderStats = LedgerOffloaderStats.create(false, false, null,
0);
}
- public MetadataStore createConfigurationMetadataStore() throws
MetadataStoreException {
+ public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ throws MetadataStoreException {
return
MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int)
config.getMetadataStoreSessionTimeoutMillis())
@@ -360,6 +364,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
+ .synchronizer(synchronizer)
.build());
}
@@ -537,6 +542,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
closeLocalMetadataStore();
if (configurationMetadataStore != null &&
shouldShutdownConfigurationMetadataStore) {
configurationMetadataStore.close();
+ if (configMetadataSynchronizer != null) {
+ configMetadataSynchronizer.close();
+ configMetadataSynchronizer = null;
+ }
}
if (transactionExecutorProvider != null) {
@@ -695,13 +704,19 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
+ "[loadBalancerOverrideBrokerNicSpeedGbps] to
override it when load balancer is enabled.");
}
- localMetadataStore = createLocalMetadataStore();
+ localMetadataSynchronizer =
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
+ ? new PulsarMetadataEventSynchronizer(this,
config.getMetadataSyncEventTopic())
+ : null;
+ localMetadataStore =
createLocalMetadataStore(localMetadataSynchronizer);
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
coordinationService = new
CoordinationServiceImpl(localMetadataStore);
if (config.isConfigurationStoreSeparated()) {
- configurationMetadataStore =
createConfigurationMetadataStore();
+ configMetadataSynchronizer =
StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
+ ? new PulsarMetadataEventSynchronizer(this,
config.getConfigurationMetadataSyncEventTopic())
+ : null;
+ configurationMetadataStore =
createConfigurationMetadataStore(configMetadataSynchronizer);
shouldShutdownConfigurationMetadataStore = true;
} else {
configurationMetadataStore = localMetadataStore;
@@ -857,6 +872,12 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.resourceUsageTransportManager =
(ResourceUsageTopicTransportManager) object;
}
this.resourceGroupServiceManager = new ResourceGroupService(this);
+ if (localMetadataSynchronizer != null) {
+ localMetadataSynchronizer.start();
+ }
+ if (configMetadataSynchronizer != null) {
+ configMetadataSynchronizer.start();
+ }
long currentTimestamp = System.currentTimeMillis();
final long bootstrapTimeSeconds =
TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - startTimestamp);
@@ -1012,7 +1033,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
}
- public MetadataStoreExtended createLocalMetadataStore() throws
MetadataStoreException {
+ public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ throws MetadataStoreException, PulsarServerException {
return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int)
config.getMetadataStoreSessionTimeoutMillis())
@@ -1022,6 +1044,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
+ .synchronizer(synchronizer)
.build());
}
@@ -1029,6 +1052,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (localMetadataStore != null) {
localMetadataStore.close();
}
+ if (localMetadataSynchronizer != null) {
+ localMetadataSynchronizer.close();
+ localMetadataSynchronizer = null;
+ }
}
protected void startLeaderElectionService() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
new file mode 100644
index 00000000000..234097ee7ca
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronizer {
+
+ private static final Logger log =
LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
+ protected PulsarService pulsar;
+ protected BrokerService brokerService;
+ protected String topicName;
+ protected PulsarClientImpl client;
+ protected volatile Producer<MetadataEvent> producer;
+ protected volatile Consumer<MetadataEvent> consumer;
+ private final CopyOnWriteArrayList<Function<MetadataEvent,
CompletableFuture<Void>>>
+ listeners = new CopyOnWriteArrayList<>();
+
+ private volatile boolean started = false;
+ public static final String SUBSCRIPTION_NAME = "metadata-syncer";
+ private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
+ protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0,
+ TimeUnit.MILLISECONDS);
+
+ public PulsarMetadataEventSynchronizer(PulsarService pulsar, String
topicName) throws PulsarServerException {
+ this.pulsar = pulsar;
+ this.brokerService = pulsar.getBrokerService();
+ this.topicName = topicName;
+ if (!StringUtils.isNotBlank(topicName)) {
+ log.info("Metadata synchronizer is disabled");
+ return;
+ }
+ }
+
+ public void start() throws PulsarServerException {
+ if (StringUtils.isBlank(topicName)) {
+ log.info("metadata topic doesn't exist.. skipping metadata
synchronizer init..");
+ return;
+ }
+ this.client = (PulsarClientImpl) pulsar.getClient();
+ startProducer();
+ startConsumer();
+ log.info("Metadata event synchronizer started on topic {}", topicName);
+ }
+
+ @Override
+ public CompletableFuture<Void> notify(MetadataEvent event) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ publishAsync(event, future);
+ return future;
+ }
+
+ @Override
+ public void registerSyncListener(Function<MetadataEvent,
CompletableFuture<Void>> listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public String getClusterName() {
+ return pulsar.getConfig().getClusterName();
+ }
+
+ private void publishAsync(MetadataEvent event, CompletableFuture<Void>
future) {
+ if (!started) {
+ log.info("Producer is not started on {}, failed to publish {}",
topicName, event);
+ future.completeExceptionally(new IllegalStateException("producer
is not started yet"));
+ }
+ producer.newMessage().value(event).sendAsync().thenAccept(__ -> {
+ log.info("successfully published metadata change event {}", event);
+ future.complete(null);
+ }).exceptionally(ex -> {
+ log.warn("failed to publish metadata update {}, will retry in {}",
topicName, MESSAGE_RATE_BACKOFF_MS, ex);
+ pulsar.getBrokerService().executor().schedule(() ->
publishAsync(event, future), MESSAGE_RATE_BACKOFF_MS,
+ TimeUnit.MILLISECONDS);
+ return null;
+ });
+ }
+
+ private void startProducer() {
+ log.info("[{}] Starting producer", topicName);
+ client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(topicName)
+
.messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
+ .sendTimeout(0, TimeUnit.SECONDS) //
+
.maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod ->
{
+ producer = prod;
+ started = true;
+ log.info("producer is created successfully {}", topicName);
+ }).exceptionally(ex -> {
+ long waitTimeMs = backOff.next();
+ log.warn("[{}] Failed to create producer ({}), retrying in
{} s", topicName, ex.getMessage(),
+ waitTimeMs / 1000.0);
+ // BackOff before retrying
+ brokerService.executor().schedule(this::startProducer,
waitTimeMs, TimeUnit.MILLISECONDS);
+ return null;
+ });
+ }
+
+ private void startConsumer() {
+ if (consumer != null) {
+ return;
+ }
+ ConsumerBuilder<MetadataEvent> consumerBuilder =
client.newConsumer(Schema.AVRO(MetadataEvent.class))
+
.topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60,
TimeUnit.SECONDS)
+
.subscriptionType(SubscriptionType.Failover).messageListener((c, msg) -> {
+ log.info("Processing metadata event for {} with listeners
{}", msg.getValue().getPath(),
+ listeners.size());
+ try {
+ if (listeners.size() == 0) {
+ c.acknowledgeAsync(msg);
+ return;
+
+ }
+ if (listeners.size() == 1) {
+
listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))
+ .exceptionally(ex -> {
+ log.warn("Failed to synchronize {} for
{}", msg.getMessageId(), topicName,
+ ex.getCause());
+ return null;
+ });
+ } else {
+ FutureUtil
+
.waitForAll(listeners.stream().map(listener -> listener.apply(msg.getValue()))
+ .collect(Collectors.toList()))
+ .thenApply(__ ->
c.acknowledgeAsync(msg)).exceptionally(ex -> {
+ log.warn("Failed to synchronize {} for
{}", msg.getMessageId(), topicName);
+ return null;
+ });
+ }
+ } catch (Exception e) {
+ log.warn("Failed to synchronize {} for {}",
msg.getMessageId(), topicName);
+ }
+ });
+ consumerBuilder.subscribeAsync().thenAccept(consumer -> {
+ log.info("successfully created consumer {}", topicName);
+ this.consumer = consumer;
+ }).exceptionally(ex -> {
+ long waitTimeMs = backOff.next();
+ log.warn("[{}] Failed to create consumer ({}), retrying in {} s",
topicName, ex.getMessage(),
+ waitTimeMs / 1000.0);
+ // BackOff before retrying
+ brokerService.executor().schedule(this::startConsumer, waitTimeMs,
TimeUnit.MILLISECONDS);
+ return null;
+ });
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ @Override
+ public void close() {
+ started = false;
+ if (producer != null) {
+ producer.closeAsync();
+ producer = null;
+ }
+ if (consumer != null) {
+ consumer.closeAsync();
+ consumer = null;
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index dd75f6eab3f..2ca1e1f986d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.auth;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -51,11 +52,13 @@ import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -65,6 +68,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -348,8 +352,20 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
-
doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore();
-
doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore();
+
+ PulsarMetadataEventSynchronizer synchronizer = StringUtils
+ .isNotBlank(pulsar.getConfig().getMetadataSyncEventTopic())
+ ? new PulsarMetadataEventSynchronizer(pulsar,
pulsar.getConfig().getMetadataSyncEventTopic())
+ : null;
+ PulsarMetadataEventSynchronizer configSynchronizer = StringUtils
+
.isNotBlank(pulsar.getConfig().getConfigurationMetadataSyncEventTopic())
+ ? new PulsarMetadataEventSynchronizer(pulsar,
+
pulsar.getConfig().getConfigurationMetadataSyncEventTopic())
+ : null;
+ doReturn(synchronizer != null ? createLocalMetadataStore(synchronizer)
: createLocalMetadataStore())
+ .when(pulsar).createLocalMetadataStore(any());
+ doReturn(configSynchronizer != null ?
createConfigurationMetadataStore(configSynchronizer)
+ :
createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore(any());
Supplier<NamespaceService> namespaceServiceSupplier = () ->
spyWithClassAndConstructorArgs(NamespaceService.class, pulsar);
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
@@ -363,10 +379,20 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
}
}
+ protected MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
+ return new ZKMetadataStore(mockZooKeeper,
MetadataStoreConfig.builder().synchronizer(synchronizer).build());
+ }
+
protected MetadataStoreExtended createLocalMetadataStore() throws
MetadataStoreException {
return new ZKMetadataStore(mockZooKeeper);
}
+ protected MetadataStoreExtended
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
+ return new ZKMetadataStore(mockZooKeeperGlobal,
+
MetadataStoreConfig.builder().synchronizer(synchronizer).build());
+
+ }
+
protected MetadataStoreExtended createConfigurationMetadataStore() throws
MetadataStoreException {
return new ZKMetadataStore(mockZooKeeperGlobal);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index 0a50ede60ae..79183b358b8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -129,8 +129,8 @@ public class OwnerShipForCurrentServerTestBase {
// Override default providers with mocked ones
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
MockZooKeeperSession mockZooKeeperSession =
MockZooKeeperSession.newInstance(mockZooKeeper);
- doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore();
- doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore();
+ doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore(null);
+ doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore(null);
Supplier<NamespaceService> namespaceServiceSupplier = () ->
spyWithClassAndConstructorArgs(NamespaceService.class, pulsar);
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 0e8d369652b..33a995fae99 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -192,8 +192,8 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
MockZooKeeperSession mockZooKeeperSession =
MockZooKeeperSession.newInstance(mockZooKeeper);
- doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore();
- doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore();
+ doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore(null);
+ doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore(null);
Supplier<NamespaceService> namespaceServiceSupplier = () ->
spyWithClassAndConstructorArgs(NamespaceService.class, pulsar);
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 0bf0878eb01..9f631e38542 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -424,8 +424,8 @@ public class WebServiceTest {
pulsar = spyWithClassAndConstructorArgs(PulsarService.class, config);
// mock zk
MockZooKeeper mockZooKeeper =
MockedPulsarServiceBaseTest.createMockZooKeeper();
- doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
- doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
+ doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore(null);
+ doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore(null);
doReturn(new
MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
pulsar.start();
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index bd6e425d91b..935fc878a62 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -133,6 +133,7 @@
<version>${spotbugs-maven-plugin.version}</version>
<configuration>
<excludeFilterFile>${basedir}/src/test/resources/findbugsExclude.xml</excludeFilterFile>
+
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
</configuration>
<executions>
<execution>
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEvent.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEvent.java
new file mode 100644
index 00000000000..8682276a176
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEvent.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.HashSet;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+
+/**
+ * Metadata event used by {@link MetadataEventSynchronizer}.
+ *
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString
+public class MetadataEvent {
+ private String path;
+ private byte[] value;
+ private HashSet<CreateOption> options;
+ private Long expectedVersion;
+ private long lastUpdatedTimestamp;
+ private String sourceCluster;
+ private NotificationType type;
+ private int version;
+ public MetadataEvent(String path, byte[] value, HashSet<CreateOption>
options, Long expectedVersion,
+ long lastUpdatedTimestamp, String sourceCluster, NotificationType
type) {
+ super();
+ this.path = path;
+ this.value = value;
+ this.options = options;
+ this.expectedVersion = expectedVersion;
+ this.lastUpdatedTimestamp = lastUpdatedTimestamp;
+ this.sourceCluster = sourceCluster;
+ this.type = type;
+ }
+
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
new file mode 100644
index 00000000000..b2aa19edf51
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Metadata synchronizer to notify and synchronize metadata change events.
+ */
+public interface MetadataEventSynchronizer {
+
+ /**
+ * Notify metadata change event.
+ * @param event
+ * metadata change event.
+ * @return
+ */
+ CompletableFuture<Void> notify(MetadataEvent event);
+
+ /**
+ * Register notification listener to sync metadata event in local cluster.
+ * @param event
+ */
+ void registerSyncListener(Function<MetadataEvent, CompletableFuture<Void>>
event);
+
+ /**
+ * Name of current cluster served by the Synchronizer.
+ * @return clusterName
+ */
+ String getClusterName();
+
+ /**
+ * close synchronizer resources.
+ */
+ void close();
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index f00a0ab8d77..e1d0ede7d2a 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -71,4 +71,10 @@ public class MetadataStoreConfig {
*/
@Builder.Default
private final int batchingMaxSizeKb = 128;
+
+ /**
+ * Pluggable MetadataEventSynchronizer to sync metadata events across the
+ * separate clusters.
+ */
+ private MetadataEventSynchronizer synchronizer;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index 4d97f80ee6d..831ef73dacd 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -22,6 +22,7 @@ import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -72,4 +73,13 @@ public interface MetadataStoreExtended extends MetadataStore
{
* the session listener
*/
void registerSessionListener(Consumer<SessionEvent> listener);
+
+ /**
+ * Get {@link MetadataEventSynchronizer} to notify and synchronize
metadata events.
+ *
+ * @return
+ */
+ default Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer()
{
+ return Optional.empty();
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index a116ea9c294..8986811ad9f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -25,9 +25,13 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Instant;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -43,6 +47,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
@@ -122,6 +128,89 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
});
}
+ protected void registerSyncLister(Optional<MetadataEventSynchronizer>
synchronizer) {
+ if (synchronizer.isPresent()) {
+ synchronizer.get().registerSyncListener(event -> {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ get(event.getPath()).thenApply(res -> {
+ Set<CreateOption> options = event.getOptions() != null ?
event.getOptions()
+ : Collections.emptySet();
+ if (res.isPresent()) {
+ GetResult existingValue = res.get();
+ if (shouldIgnoreEvent(event, existingValue)) {
+ result.complete(null);
+ return result;
+ }
+ }
+ // else update the event
+ CompletableFuture<?> updateResult = (event.getType() ==
NotificationType.Deleted)
+ ? deleteInternal(event.getPath(),
Optional.ofNullable(event.getExpectedVersion()))
+ : putInternal(event.getPath(), event.getValue(),
+
Optional.ofNullable(event.getExpectedVersion()), options);
+ updateResult.thenApply(stat -> {
+ if (log.isDebugEnabled()) {
+ log.debug("successfully updated {}",
event.getPath());
+ }
+ return result.complete(null);
+ }).exceptionally(ex -> {
+ log.warn("Failed to update metadata {}",
event.getPath(), ex.getCause());
+ if (ex.getCause() instanceof
MetadataStoreException.BadVersionException) {
+ result.complete(null);
+ } else {
+ result.completeExceptionally(ex);
+ }
+ return false;
+ });
+ return result;
+ });
+ return result;
+ });
+ }
+ }
+
+ @VisibleForTesting
+ protected boolean shouldIgnoreEvent(MetadataEvent event, GetResult
existingValue) {
+ long existingVersion = existingValue.getStat() != null ?
existingValue.getStat().getVersion() : -1;
+ long existingTimestamp = existingValue.getStat() != null ?
existingValue.getStat().getModificationTimestamp()
+ : -1;
+ String sourceClusterName = event.getSourceCluster();
+ Set<CreateOption> options = event.getOptions() != null ?
event.getOptions()
+ : Collections.emptySet();
+ String currentClusterName =
getMetadataEventSynchronizer().get().getClusterName();
+ // ignore event from the unknown cluster
+ if (sourceClusterName == null || currentClusterName == null) {
+ return true;
+ }
+ // ignore event if metadata is ephemeral or
+ // sequential
+ if (options.contains(CreateOption.Ephemeral) ||
event.getOptions().contains(CreateOption.Sequential)) {
+ return true;
+ }
+ // ignore the event if event occurred before the
+ // existing update
+ if (event.getLastUpdatedTimestamp() < existingTimestamp) {
+ return true;
+ }
+ if (currentClusterName.equals(sourceClusterName)) {
+ // if expected version doesn't exist then ignore the
+ // event
+ if ((event.getExpectedVersion() != null &&
event.getExpectedVersion() > 0)
+ && event.getExpectedVersion() != existingVersion) {
+ return true;
+ }
+
+ } else if ((event.getLastUpdatedTimestamp() == existingTimestamp
+ && sourceClusterName.compareTo(currentClusterName) < 0)) {
+ // ignore: if event is older than existing store
+ // metadata
+ // or if timestamp is same for both the event then
+ // larger cluster-name according to lexical sorting
+ // should win for the update.
+ return true;
+ }
+ return false;
+ }
+
@Override
public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
@@ -227,17 +316,28 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
+ if (getMetadataEventSynchronizer().isPresent()) {
+ MetadataEvent event = new MetadataEvent(path, null, new
HashSet<>(),
+ expectedVersion.orElse(null), Instant.now().toEpochMilli(),
+ getMetadataEventSynchronizer().get().getClusterName(),
NotificationType.Deleted);
+ return getMetadataEventSynchronizer().get().notify(event)
+ .thenCompose(__ -> deleteInternal(path, expectedVersion));
+ } else {
+ return deleteInternal(path, expectedVersion);
+ }
+ }
+
+ private CompletableFuture<Void> deleteInternal(String path, Optional<Long>
expectedVersion) {
// Ensure caches are invalidated before the operation is confirmed
- return storeDelete(path, expectedVersion)
- .thenRun(() -> {
- existsCache.synchronous().invalidate(path);
- String parent = parent(path);
- if (parent != null) {
- childrenCache.synchronous().invalidate(parent);
- }
+ return storeDelete(path, expectedVersion).thenRun(() -> {
+ existsCache.synchronous().invalidate(path);
+ String parent = parent(path);
+ if (parent != null) {
+ childrenCache.synchronous().invalidate(parent);
+ }
- metadataCaches.forEach(c -> c.invalidate(path));
- });
+ metadataCaches.forEach(c -> c.invalidate(path));
+ });
}
@Override
@@ -266,8 +366,25 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
+ HashSet<CreateOption> ops = new HashSet<>(options);
+ if (getMetadataEventSynchronizer().isPresent()) {
+ Long version = optExpectedVersion.isPresent() &&
optExpectedVersion.get() < 0 ? null
+ : optExpectedVersion.orElse(null);
+ MetadataEvent event = new MetadataEvent(path, data, ops, version,
+ Instant.now().toEpochMilli(),
getMetadataEventSynchronizer().get().getClusterName(),
+ NotificationType.Modified);
+ return getMetadataEventSynchronizer().get().notify(event)
+ .thenCompose(__ -> putInternal(path, data,
optExpectedVersion, options));
+ } else {
+ return putInternal(path, data, optExpectedVersion, options);
+ }
+
+ }
+ public final CompletableFuture<Stat> putInternal(String path, byte[] data,
Optional<Long> optExpectedVersion,
+ Set<CreateOption> options) {
// Ensure caches are invalidated before the operation is confirmed
- return storePut(path, data, optExpectedVersion, options)
+ return storePut(path, data, optExpectedVersion,
+ (options != null && !options.isEmpty()) ?
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class))
.thenApply(stat -> {
NotificationType type = stat.getVersion() == 0 ?
NotificationType.Created
: NotificationType.Modified;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 63efba8f724..924a6ac5d6d 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -62,6 +63,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
private final NavigableMap<String, Value> map;
private final AtomicLong sequentialIdGenerator;
+ private MetadataEventSynchronizer synchronizer;
private static final Map<String, NavigableMap<String, Value>> STATIC_MAPS
= new MapMaker()
.weakValues().makeMap();
@@ -75,6 +77,9 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
throws MetadataStoreException {
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
+ // update synchronizer and register sync listener
+ synchronizer = metadataStoreConfig.getSynchronizer();
+ registerSyncLister(Optional.ofNullable(synchronizer));
if ("local".equals(name)) {
map = new TreeMap<>();
sequentialIdGenerator = new AtomicLong();
@@ -218,4 +223,9 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
}
}
+
+ @Override
+ public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
+ return Optional.ofNullable(synchronizer);
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 2d4c0a00bf0..69af0e76914 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -47,6 +47,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
@@ -91,6 +92,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
private final WriteOptions optionDontSync;
private final ReadOptions optionCache;
private final ReadOptions optionDontCache;
+ private MetadataEventSynchronizer synchronizer;
enum State {
RUNNING, CLOSED
@@ -115,6 +117,9 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
// Create a new store instance
store = new RocksdbMetadataStore(metadataStoreUri, conf);
+ // update synchronizer and register sync listener
+ store.synchronizer = conf.getSynchronizer();
+ store.registerSyncLister(Optional.ofNullable(store.synchronizer));
instancesCache.put(metadataStoreUri, store);
return store;
}
@@ -589,4 +594,9 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
dbStateLock.readLock().unlock();
}
}
+
+ @Override
+ public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
+ return Optional.ofNullable(synchronizer);
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index ad723f28f89..87b6be283fc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -112,7 +112,13 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
@VisibleForTesting
@SneakyThrows
public ZKMetadataStore(ZooKeeper zkc) {
- super(MetadataStoreConfig.builder().build());
+ this(zkc, MetadataStoreConfig.builder().build());
+ }
+
+ @VisibleForTesting
+ @SneakyThrows
+ public ZKMetadataStore(ZooKeeper zkc, MetadataStoreConfig config) {
+ super(config);
this.zkConnectString = null;
this.metadataStoreConfig = null;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 616cac289ef..c9d245b8caf 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
@@ -49,6 +50,7 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private final int maxDelayMillis;
private final int maxOperations;
private final int maxSize;
+ private MetadataEventSynchronizer synchronizer;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
super();
@@ -68,6 +70,10 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
readOps = null;
writeOps = null;
}
+
+ // update synchronizer and register sync listener
+ synchronizer = conf.getSynchronizer();
+ registerSyncLister(Optional.ofNullable(synchronizer));
}
@Override
@@ -143,6 +149,11 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
return op.getFuture();
}
+ @Override
+ public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
+ return Optional.ofNullable(synchronizer);
+ }
+
private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op)
{
if (enabled) {
if (!queue.offer(op)) {
diff --git a/pulsar-metadata/src/main/resources/findbugsExclude.xml
b/pulsar-metadata/src/main/resources/findbugsExclude.xml
new file mode 100644
index 00000000000..21578b9f8cb
--- /dev/null
+++ b/pulsar-metadata/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,34 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<FindBugsFilter>
+ <Match>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+ <Match>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+ <Match>
+ <Bug pattern="MS_EXPOSE_REP"/>
+ </Match>
+ <Match>
+ <Bug pattern="UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD"/>
+ </Match>
+</FindBugsFilter>
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
index af6a94d9c7e..1df95076033 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
@@ -18,71 +18,197 @@
*/
package org.apache.pulsar.metadata.impl;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
-import lombok.Cleanup;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.awaitility.Awaitility;
+import org.testcontainers.shaded.com.google.common.collect.Sets;
import org.testng.annotations.Test;
+import lombok.Cleanup;
+
public class LocalMemoryMetadataStoreTest {
+ HashSet<CreateOption> EMPTY_SET = new HashSet<>();
@Test
- public void testPrivateInstance() throws Exception {
+ public void testNotifyEvent() throws Exception {
+ TestMetadataEventSynchronizer sync = new
TestMetadataEventSynchronizer();
@Cleanup
MetadataStore store1 = MetadataStoreFactory.create("memory:local",
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().synchronizer(sync).build());
- @Cleanup
- MetadataStore store2 = MetadataStoreFactory.create("memory:local",
- MetadataStoreConfig.builder().build());
+ String path = "/test";
+ byte[] value = "value".getBytes(StandardCharsets.UTF_8);
+ store1.put(path, value, Optional.empty()).join();
- store1.put("/test", "value".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
+ assertTrue(store1.exists(path).join());
+ MetadataEvent event = sync.notifiedEvents.get(path);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> event !=
null);
+ assertNotNull(event);
+ assertEquals(event.getPath(), path);
+ assertEquals(event.getValue(), value);
+ assertEquals(event.getOptions(), EMPTY_SET);
+ assertEquals(event.getType(), NotificationType.Modified);
+ assertEquals(event.getSourceCluster(), sync.clusterName);
+ assertNull(event.getExpectedVersion());
- assertTrue(store1.exists("/test").join());
- assertFalse(store2.exists("/test").join());
+ // (2) with expected version
+ long exptectedVersion = 0L;
+ for (; exptectedVersion < 4; exptectedVersion++) {
+ sync.notifiedEvents.remove(path);
+ store1.put(path, value, Optional.of(exptectedVersion)).join();
+ MetadataEvent event2 = sync.notifiedEvents.get(path);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> event2
!= null);
+ assertNotNull(event2);
+ assertEquals(event2.getPath(), path);
+ assertEquals((long) event2.getExpectedVersion(), exptectedVersion);
+ assertEquals(event2.getType(), NotificationType.Modified);
+ }
+
+ // (3) delete node
+ sync.notifiedEvents.remove(path);
+ store1.delete(path, Optional.of(exptectedVersion)).join();
+ MetadataEvent event2 = sync.notifiedEvents.get(path);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> event2 !=
null);
+ assertNotNull(event2);
+ assertEquals(event2.getPath(), path);
+ assertEquals((long) event2.getExpectedVersion(), exptectedVersion);
+ assertEquals(event2.getType(), NotificationType.Deleted);
+ assertEquals(event2.getSourceCluster(), sync.clusterName);
+ assertEquals(event2.getOptions(), EMPTY_SET);
}
@Test
- public void testSharedInstance() throws Exception {
- String url = "memory:" + UUID.randomUUID();
+ public void testIsIgnoreEvent() throws Exception {
+ TestMetadataEventSynchronizer sync = new
TestMetadataEventSynchronizer();
@Cleanup
- MetadataStore store1 = MetadataStoreFactory.create(url,
- MetadataStoreConfig.builder().build());
+ AbstractMetadataStore store1 = (AbstractMetadataStore)
MetadataStoreFactory.create("memory:local",
+ MetadataStoreConfig.builder().synchronizer(sync).build());
+
+ String path = "/test";
+ byte[] value1 = "value1".getBytes(StandardCharsets.UTF_8);
+ byte[] value2 = "value2".getBytes(StandardCharsets.UTF_8);
+ store1.put(path, value1, Optional.empty()).join();
+
+ long time1 = Instant.now().toEpochMilli();
+ long time2 = time1 -5;
+ Stat stats = new Stat(path, 0, time2, time2, false, false);
+ GetResult eixistingData = new GetResult(value1, stats);
+ // (1) ignore due to Ephemeral node
+ MetadataEvent event = new MetadataEvent(path, value1,
Sets.newHashSet(CreateOption.Ephemeral), 0L,
+ time1, sync.getClusterName(), NotificationType.Modified);
+ assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+ // (2) ignore due to invalid expected version
+ event = new MetadataEvent(path, value1, EMPTY_SET,
10L/*invalid-version*/,
+ time1, sync.getClusterName(), NotificationType.Modified);
+ assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+ // (3) accept with valid conditions
+ event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+ time1, sync.getClusterName(), NotificationType.Modified);
+ assertFalse(store1.shouldIgnoreEvent(event, eixistingData));
+ // (4) Ignore due to invalid cluster name
+ event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+ time1, null, NotificationType.Modified);
+ assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+ // (5) consider due to same timestamp and correct expected version on
the same cluster
+ event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+ time2, sync.getClusterName(), NotificationType.Modified);
+ assertFalse(store1.shouldIgnoreEvent(event, eixistingData));
+ // (6) Ignore due to same timestamp but different expected version on
the same cluster
+ event = new MetadataEvent(path, value1, EMPTY_SET, 10L,
+ time2, sync.getClusterName(), NotificationType.Modified);
+ assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+ // (7) consider due to same timestamp but expected version=-1 on the
same cluster
+ event = new MetadataEvent(path, value1, EMPTY_SET, null,
+ time2, sync.getClusterName(), NotificationType.Modified);
+ assertFalse(store1.shouldIgnoreEvent(event, eixistingData));
+ // (8) Ignore due to less timestamp on the same cluster
+ event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+ time2-5, sync.getClusterName(), NotificationType.Modified);
+ assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+ // (9) consider "uest" > "test" and same timestamp
+ event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+ time2, "uest", NotificationType.Modified);
+ assertFalse(store1.shouldIgnoreEvent(event, eixistingData));
+ // (10) ignore "uest" > "test" and less timestamp
+ event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+ time2-5, "uest", NotificationType.Modified);
+ assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+ // (11) ignore "rest" < "test" and same timestamp
+ event = new MetadataEvent(path, value1, EMPTY_SET, 0L,
+ time2, "rest", NotificationType.Modified);
+ assertTrue(store1.shouldIgnoreEvent(event, eixistingData));
+ }
+ @Test
+ public void testSyncListener() throws Exception {
+ TestMetadataEventSynchronizer sync = new
TestMetadataEventSynchronizer();
@Cleanup
- MetadataStore store2 = MetadataStoreFactory.create(url,
- MetadataStoreConfig.builder().build());
+ MetadataStore store1 = MetadataStoreFactory.create("memory:local",
+ MetadataStoreConfig.builder().synchronizer(sync).build());
+
+ String path = "/test";
+ byte[] value1 = "value1".getBytes(StandardCharsets.UTF_8);
+ byte[] value2 = "value2".getBytes(StandardCharsets.UTF_8);
+ store1.put(path, value1, Optional.empty()).join();
- store1.put("/test", "value".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
+ assertTrue(store1.exists(path).join());
+
+ Stat stats = store1.get(path).get().get().getStat();
+ MetadataEvent event = new MetadataEvent(path, value2, EMPTY_SET,
stats.getVersion(),
+ stats.getModificationTimestamp() + 1, sync.clusterName,
NotificationType.Modified);
+ sync.listener.apply(event).get();
+ assertEquals(store1.get(path).get().get().getValue(), value2);
+ }
- assertTrue(store1.exists("/test").join());
- assertTrue(store2.exists("/test").join());
+ static class TestMetadataEventSynchronizer implements
MetadataEventSynchronizer {
+ public Map<String, MetadataEvent> notifiedEvents = new
ConcurrentHashMap<>();
+ public String clusterName = "test";
+ public volatile Function<MetadataEvent, CompletableFuture<Void>>
listener;
- store2.delete("/test", Optional.empty()).join();
+ @Override
+ public CompletableFuture<Void> notify(MetadataEvent event) {
+ notifiedEvents.put(event.getPath(), event);
+ return CompletableFuture.completedFuture(null);
+ }
- assertFalse(store2.exists("/test").join());
+ @Override
+ public void registerSyncListener(Function<MetadataEvent,
CompletableFuture<Void>> fun) {
+ this.listener = fun;
+ }
- // The exists will be updated based on the cache invalidation in store1
- Awaitility.await().untilAsserted(() -> {
- assertFalse(store1.exists("/test").join());
- });
- }
+ @Override
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ @Override
+ public void close() {
+ // No-op
+ }
- @Test
- public void testPathValid() {
- assertFalse(AbstractMetadataStore.isValidPath(null));
- assertFalse(AbstractMetadataStore.isValidPath(""));
- assertFalse(AbstractMetadataStore.isValidPath(" "));
- assertTrue(AbstractMetadataStore.isValidPath("/"));
- assertTrue(AbstractMetadataStore.isValidPath("/test"));
- assertFalse(AbstractMetadataStore.isValidPath("/test/"));
- assertTrue(AbstractMetadataStore.isValidPath("/test/ABC"));
}
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataEventSynchronizerTest.java
similarity index 98%
copy from
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
copy to
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataEventSynchronizerTest.java
index af6a94d9c7e..df5f9000157 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataEventSynchronizerTest.java
@@ -30,7 +30,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
-public class LocalMemoryMetadataStoreTest {
+public class MetadataEventSynchronizerTest {
@Test
public void testPrivateInstance() throws Exception {
diff --git a/site2/docs/reference-configuration-broker.md
b/site2/docs/reference-configuration-broker.md
index d2f24d1d901..1b442cfe73b 100644
--- a/site2/docs/reference-configuration-broker.md
+++ b/site2/docs/reference-configuration-broker.md
@@ -2383,6 +2383,25 @@ Configuration file path for local metadata store. It's
supported by RocksdbMetad
**Category**: Server
+### metadataSyncEventTopic
+Event topic to sync metadata between separate pulsar clusters on different
cloud platforms.
+
+**Default**: `null`
+
+**Dynamic**: `true`
+
+**Category**: Server
+
+
+### configurationMetadataSyncEventTopic
+Event topic to sync configuration-metadata between separate pulsar clusters on
different cloud platforms.
+
+**Default**: `null`
+
+**Dynamic**: `true`
+
+**Category**: Server
+
### metadataStoreOperationTimeoutSeconds
Metadata store operation timeout in seconds.
diff --git a/site2/docs/reference-configuration.md
b/site2/docs/reference-configuration.md
index 0ea287c89b5..7b082fcbf56 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -14,4 +14,4 @@ You can manage Pulsar configuration by configuration files in
the [`conf`](https
- [Standalone](./reference-configuration-standalone.md)
- [WebSocket](./reference-configuration-websocket.md)
- [Pulsar proxy](./reference-configuration-pulsar-proxy.md)
-- [ZooKeeper](./reference-configuration-zookeeper.md)
+- [ZooKeeper](./reference-configuration-zookeeper.md)
\ No newline at end of file