This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 6e499fe0250 [fix] [broker] Fix configurationMetadataSyncEventTopic is
marked supporting dynamic setting, but not implemented (#22684)
6e499fe0250 is described below
commit 6e499fe025009266bf644210032aac90449e7dd3
Author: fengyubiao <[email protected]>
AuthorDate: Fri May 10 08:38:49 2024 +0800
[fix] [broker] Fix configurationMetadataSyncEventTopic is marked supporting
dynamic setting, but not implemented (#22684)
(cherry picked from commit ff4853e06259d2c278d76d393dd9b650ad3edf4a)
---
.../org/apache/pulsar/broker/PulsarService.java | 80 ++++++-
.../pulsar/broker/service/BrokerService.java | 5 +
.../service/PulsarMetadataEventSynchronizer.java | 237 ++++++++++++++++-----
...eoReplicationWithConfigurationSyncTestBase.java | 234 ++++++++++++++++++++
.../pulsar/broker/service/SyncConfigStoreTest.java | 116 ++++++++++
.../metadata/api/MetadataEventSynchronizer.java | 2 +-
.../api/extended/MetadataStoreExtended.java | 2 +
.../metadata/impl/LocalMemoryMetadataStore.java | 9 +-
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 9 +-
.../batching/AbstractBatchedMetadataStore.java | 11 +-
.../metadata/impl/oxia/OxiaMetadataStore.java | 10 +-
.../impl/LocalMemoryMetadataStoreTest.java | 4 +-
12 files changed, 641 insertions(+), 78 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 58d7e71b65d..ac37aca531a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -607,13 +607,12 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
}
- closeLocalMetadataStore();
+ asyncCloseFutures.add(closeLocalMetadataStore());
+ if (configMetadataSynchronizer != null) {
+ asyncCloseFutures.add(configMetadataSynchronizer.closeAsync());
+ }
if (configurationMetadataStore != null &&
shouldShutdownConfigurationMetadataStore) {
configurationMetadataStore.close();
- if (configMetadataSynchronizer != null) {
- configMetadataSynchronizer.close();
- configMetadataSynchronizer = null;
- }
}
if (transactionExecutorProvider != null) {
@@ -1160,14 +1159,16 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.build());
}
- protected void closeLocalMetadataStore() throws Exception {
+ protected CompletableFuture<Void> closeLocalMetadataStore() throws
Exception {
if (localMetadataStore != null) {
localMetadataStore.close();
}
if (localMetadataSynchronizer != null) {
- localMetadataSynchronizer.close();
+ CompletableFuture<Void> closeSynchronizer =
localMetadataSynchronizer.closeAsync();
localMetadataSynchronizer = null;
+ return closeSynchronizer;
}
+ return CompletableFuture.completedFuture(null);
}
protected void startLeaderElectionService() {
@@ -1928,4 +1929,69 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return CompletableFuture.failedFuture(e);
}
}
+
+ public void initConfigMetadataSynchronizerIfNeeded() {
+ mutex.lock();
+ try {
+ final String newTopic =
config.getConfigurationMetadataSyncEventTopic();
+ final PulsarMetadataEventSynchronizer oldSynchronizer =
configMetadataSynchronizer;
+ // Skip if not support.
+ if (!(configurationMetadataStore instanceof
MetadataStoreExtended)) {
+ LOG.info(
+ "Skip to update Metadata Synchronizer because of the
Configuration Metadata Store using[{}]"
+ + " does not support.",
configurationMetadataStore.getClass().getName());
+ return;
+ }
+ // Skip if no changes.
+ // case-1: both null.
+ // case-2: both topics are the same.
+ if ((oldSynchronizer == null && StringUtils.isBlank(newTopic))) {
+ LOG.info("Skip to update Metadata Synchronizer because the
topic[null] does not changed.");
+ }
+ if (StringUtils.isNotBlank(newTopic) && oldSynchronizer != null) {
+ TopicName newTopicName = TopicName.get(newTopic);
+ TopicName oldTopicName =
TopicName.get(oldSynchronizer.getTopicName());
+ if (newTopicName.equals(oldTopicName)) {
+ LOG.info("Skip to update Metadata Synchronizer because the
topic[{}] does not changed.",
+ oldTopicName);
+ }
+ }
+ // Update(null or not null).
+ // 1.set the new one.
+ // 2.close the old one.
+ // 3.async start the new one.
+ if (StringUtils.isBlank(newTopic)) {
+ configMetadataSynchronizer = null;
+ } else {
+ configMetadataSynchronizer = new
PulsarMetadataEventSynchronizer(this, newTopic);
+ }
+ // close the old one and start the new one.
+ PulsarMetadataEventSynchronizer newSynchronizer =
configMetadataSynchronizer;
+ MetadataStoreExtended metadataStoreExtended =
(MetadataStoreExtended) configurationMetadataStore;
+
metadataStoreExtended.updateMetadataEventSynchronizer(newSynchronizer);
+ Runnable startNewSynchronizer = () -> {
+ if (newSynchronizer == null) {
+ return;
+ }
+ try {
+ newSynchronizer.start();
+ } catch (Exception e) {
+ // It only occurs when get internal client fails.
+ LOG.error("Start Metadata Synchronizer with topic {}
failed.",
+ newTopic, e);
+ }
+ };
+ executor.submit(() -> {
+ if (oldSynchronizer != null) {
+ oldSynchronizer.closeAsync().whenComplete((ignore, ex) -> {
+ startNewSynchronizer.run();
+ });
+ } else {
+ startNewSynchronizer.run();
+ }
+ });
+ } finally {
+ mutex.unlock();
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c1b2b9e1da9..0cba1647031 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2806,6 +2806,11 @@ public class BrokerService implements Closeable {
pulsar.getWebService().updateHttpRequestsFailOnUnknownPropertiesEnabled((boolean)
enabled);
});
+ // add listener to notify web service
httpRequestsFailOnUnknownPropertiesEnabled changed.
+ registerConfigurationListener("configurationMetadataSyncEventTopic",
enabled -> {
+ pulsar.initConfigMetadataSynchronizerIfNeeded();
+ });
+
// add more listeners here
// (3) create dynamic-config if not exist.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
index 0383a0b7552..8b2ebf20053 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -19,11 +19,15 @@
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
+import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -46,6 +50,7 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
private static final Logger log =
LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
protected PulsarService pulsar;
protected BrokerService brokerService;
+ @Getter
protected String topicName;
protected PulsarClientImpl client;
protected volatile Producer<MetadataEvent> producer;
@@ -53,19 +58,32 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
private final CopyOnWriteArrayList<Function<MetadataEvent,
CompletableFuture<Void>>>
listeners = new CopyOnWriteArrayList<>();
- private volatile boolean started = false;
+ static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer,
State> STATE_UPDATER =
+
AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class,
State.class, "state");
+ @Getter
+ private volatile State state;
public static final String SUBSCRIPTION_NAME = "metadata-syncer";
private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0,
TimeUnit.MILLISECONDS);
+ private volatile CompletableFuture<Void> closeFuture;
- public PulsarMetadataEventSynchronizer(PulsarService pulsar, String
topicName) throws PulsarServerException {
+ public enum State {
+ Init,
+ Starting_Producer,
+ Starting_Consumer,
+ Started,
+ Closing,
+ Closed;
+ }
+
+ public PulsarMetadataEventSynchronizer(PulsarService pulsar, String
topicName) {
this.pulsar = pulsar;
this.brokerService = pulsar.getBrokerService();
this.topicName = topicName;
+ this.state = State.Init;
if (!StringUtils.isNotBlank(topicName)) {
log.info("Metadata synchronizer is disabled");
- return;
}
}
@@ -74,10 +92,11 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
log.info("metadata topic doesn't exist.. skipping metadata
synchronizer init..");
return;
}
+ log.info("Metadata event synchronizer is starting on topic {}",
topicName);
this.client = (PulsarClientImpl) pulsar.getClient();
- startProducer();
- startConsumer();
- log.info("Metadata event synchronizer started on topic {}", topicName);
+ if (STATE_UPDATER.compareAndSet(this, State.Init,
State.Starting_Producer)) {
+ startProducer();
+ }
}
@Override
@@ -98,7 +117,7 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
}
private void publishAsync(MetadataEvent event, CompletableFuture<Void>
future) {
- if (!started) {
+ if (!isProducerStarted()) {
log.info("Producer is not started on {}, failed to publish {}",
topicName, event);
future.completeExceptionally(new IllegalStateException("producer
is not started yet"));
}
@@ -114,62 +133,100 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
}
private void startProducer() {
+ if (isClosingOrClosed()) {
+ log.info("[{}] Skip to start new producer because the synchronizer
is closed", topicName);
+ }
+ if (producer != null) {
+ log.error("[{}] Failed to start the producer because the producer
has been set, state: {}",
+ topicName, state);
+ return;
+ }
log.info("[{}] Starting producer", topicName);
client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(topicName)
-
.messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
- .sendTimeout(0, TimeUnit.SECONDS) //
-
.maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod ->
{
+
.messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
+ .sendTimeout(0, TimeUnit.SECONDS) //
+
.maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod ->
{
+ backOff.reset();
+ if (STATE_UPDATER.compareAndSet(this, State.Starting_Producer,
State.Starting_Consumer)) {
producer = prod;
- started = true;
log.info("producer is created successfully {}", topicName);
- }).exceptionally(ex -> {
- long waitTimeMs = backOff.next();
- log.warn("[{}] Failed to create producer ({}), retrying in
{} s", topicName, ex.getMessage(),
- waitTimeMs / 1000.0);
- // BackOff before retrying
- brokerService.executor().schedule(this::startProducer,
waitTimeMs, TimeUnit.MILLISECONDS);
- return null;
- });
+ PulsarMetadataEventSynchronizer.this.startConsumer();
+ } else {
+ State stateTransient = state;
+ log.info("[{}] Closing the new producer because the
synchronizer state is {}", prod,
+ stateTransient);
+ CompletableFuture closeProducer = new
CompletableFuture<>();
+ closeResource(() -> prod.closeAsync(), closeProducer);
+ closeProducer.thenRun(() -> {
+ log.info("[{}] Closed the new producer because the
synchronizer state is {}", prod,
+ stateTransient);
+ });
+ }
+ }).exceptionally(ex -> {
+ long waitTimeMs = backOff.next();
+ log.warn("[{}] Failed to create producer ({}), retrying in {}
s", topicName, ex.getMessage(),
+ waitTimeMs / 1000.0);
+ // BackOff before retrying
+ brokerService.executor().schedule(this::startProducer,
waitTimeMs, TimeUnit.MILLISECONDS);
+ return null;
+ });
}
private void startConsumer() {
+ if (isClosingOrClosed()) {
+ log.info("[{}] Skip to start new consumer because the synchronizer
is closed", topicName);
+ }
if (consumer != null) {
+ log.error("[{}] Failed to start the consumer because the consumer
has been set, state: {}",
+ topicName, state);
return;
}
+ log.info("[{}] Starting consumer", topicName);
ConsumerBuilder<MetadataEvent> consumerBuilder =
client.newConsumer(Schema.AVRO(MetadataEvent.class))
-
.topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60,
TimeUnit.SECONDS)
-
.subscriptionType(SubscriptionType.Failover).messageListener((c, msg) -> {
- log.info("Processing metadata event for {} with listeners
{}", msg.getValue().getPath(),
- listeners.size());
- try {
- if (listeners.size() == 0) {
- c.acknowledgeAsync(msg);
- return;
-
- }
- if (listeners.size() == 1) {
-
listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))
- .exceptionally(ex -> {
- log.warn("Failed to synchronize {} for
{}", msg.getMessageId(), topicName,
- ex.getCause());
- return null;
- });
- } else {
- FutureUtil
-
.waitForAll(listeners.stream().map(listener -> listener.apply(msg.getValue()))
- .collect(Collectors.toList()))
- .thenApply(__ ->
c.acknowledgeAsync(msg)).exceptionally(ex -> {
- log.warn("Failed to synchronize {} for
{}", msg.getMessageId(), topicName);
- return null;
- });
- }
- } catch (Exception e) {
- log.warn("Failed to synchronize {} for {}",
msg.getMessageId(), topicName);
+
.topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60,
TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Failover).messageListener((c,
msg) -> {
+ log.info("Processing metadata event for {} with listeners {}",
msg.getValue().getPath(),
+ listeners.size());
+ try {
+ if (listeners.size() == 0) {
+ c.acknowledgeAsync(msg);
+ return;
+
}
- });
+ if (listeners.size() == 1) {
+ listeners.get(0).apply(msg.getValue()).thenApply(__ ->
c.acknowledgeAsync(msg))
+ .exceptionally(ex -> {
+ log.warn("Failed to synchronize {} for
{}", msg.getMessageId(), topicName,
+ ex.getCause());
+ return null;
+ });
+ } else {
+ FutureUtil
+ .waitForAll(listeners.stream().map(listener ->
listener.apply(msg.getValue()))
+ .collect(Collectors.toList()))
+ .thenApply(__ ->
c.acknowledgeAsync(msg)).exceptionally(ex -> {
+ log.warn("Failed to synchronize {} for
{}", msg.getMessageId(), topicName);
+ return null;
+ });
+ }
+ } catch (Exception e) {
+ log.warn("Failed to synchronize {} for {}",
msg.getMessageId(), topicName);
+ }
+ });
consumerBuilder.subscribeAsync().thenAccept(consumer -> {
- log.info("successfully created consumer {}", topicName);
- this.consumer = consumer;
+ backOff.reset();
+ if (STATE_UPDATER.compareAndSet(this, State.Starting_Consumer,
State.Started)) {
+ this.consumer = consumer;
+ log.info("successfully created consumer {}", topicName);
+ } else {
+ State stateTransient = state;
+ log.info("[{}] Closing the new consumer because the
synchronizer state is {}", stateTransient);
+ CompletableFuture closeConsumer = new CompletableFuture<>();
+ closeResource(() -> consumer.closeAsync(), closeConsumer);
+ closeConsumer.thenRun(() -> {
+ log.info("[{}] Closed the new consumer because the
synchronizer state is {}", stateTransient);
+ });
+ }
}).exceptionally(ex -> {
long waitTimeMs = backOff.next();
log.warn("[{}] Failed to create consumer ({}), retrying in {} s",
topicName, ex.getMessage(),
@@ -181,19 +238,81 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
}
public boolean isStarted() {
- return started;
+ return this.state == State.Started;
+ }
+
+ public boolean isProducerStarted() {
+ return this.state.ordinal() > State.Starting_Producer.ordinal()
+ && this.state.ordinal() < State.Closing.ordinal();
+ }
+
+ public boolean isClosingOrClosed() {
+ return this.state == State.Closing || this.state == State.Closed;
}
@Override
- public void close() {
- started = false;
- if (producer != null) {
- producer.closeAsync();
- producer = null;
+ public synchronized CompletableFuture<Void> closeAsync() {
+ int tryChangeStateCounter = 0;
+ while (true) {
+ if (isClosingOrClosed()) {
+ return closeFuture;
+ }
+ if (STATE_UPDATER.compareAndSet(this, State.Init, State.Closing)
+ || STATE_UPDATER.compareAndSet(this, State.Starting_Producer,
State.Closing)
+ || STATE_UPDATER.compareAndSet(this, State.Starting_Consumer,
State.Closing)
+ || STATE_UPDATER.compareAndSet(this, State.Started,
State.Closing)) {
+ break;
+ }
+ // Just for avoid spinning loop which would cause 100% CPU
consumption here.
+ if (++tryChangeStateCounter > 100) {
+ log.error("Unexpected error: the state can not be changed to
closing {}, state: {}", topicName, state);
+ return CompletableFuture.failedFuture(new
RuntimeException("Unexpected error,"
+ + " the state can not be changed to closing"));
+ }
}
- if (consumer != null) {
- consumer.closeAsync();
- consumer = null;
+ CompletableFuture<Void> closeProducer = new CompletableFuture<>();
+ CompletableFuture<Void> closeConsumer = new CompletableFuture<>();
+ if (producer == null) {
+ closeProducer.complete(null);
+ } else {
+ closeResource(() -> producer.closeAsync(), closeProducer);
+ }
+ if (consumer == null) {
+ closeConsumer.complete(null);
+ } else {
+ closeResource(() -> consumer.closeAsync(), closeConsumer);
+ }
+
+ // Add logs.
+ closeProducer.thenRun(() -> log.info("Successfully close producer {}",
topicName));
+ closeConsumer.thenRun(() -> log.info("Successfully close consumer {}",
topicName));
+
+ closeFuture = FutureUtil.waitForAll(Arrays.asList(closeProducer,
closeConsumer));
+ closeFuture.thenRun(() -> {
+ this.state = State.Closed;
+ log.info("Successfully close metadata store synchronizer {}",
topicName);
+ });
+ return closeFuture;
+ }
+
+ private void closeResource(final Supplier<CompletableFuture<Void>>
asyncCloseable,
+ final CompletableFuture<Void> future) {
+ if (asyncCloseable == null) {
+ future.complete(null);
+ return;
}
+ asyncCloseable.get().whenComplete((ignore, ex) -> {
+ if (ex == null) {
+ backOff.reset();
+ future.complete(null);
+ return;
+ }
+ // Retry.
+ long waitTimeMs = backOff.next();
+ log.warn("[{}] Exception: '{}' occurred while trying to close the
%s. Retrying again in {} s.",
+ topicName, ex.getMessage(),
asyncCloseable.getClass().getSimpleName(), waitTimeMs / 1000.0, ex);
+ brokerService.executor().schedule(() ->
closeResource(asyncCloseable, future), waitTimeMs,
+ TimeUnit.MILLISECONDS);
+ });
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
new file mode 100644
index 00000000000..9b4dd5192e1
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+
+@Slf4j
+public abstract class GeoReplicationWithConfigurationSyncTestBase extends
TestRetrySupport {
+
+ protected final String defaultTenant = "public";
+ protected final String defaultNamespace = defaultTenant + "/default";
+
+ protected final String cluster1 = "r1";
+ protected URL url1;
+ protected URL urlTls1;
+ protected ServiceConfiguration config1 = new ServiceConfiguration();
+ protected ZookeeperServerTest brokerConfigZk1;
+ protected LocalBookkeeperEnsemble bkEnsemble1;
+ protected PulsarService pulsar1;
+ protected BrokerService ns1;
+ protected PulsarAdmin admin1;
+ protected PulsarClient client1;
+
+ protected URL url2;
+ protected URL urlTls2;
+ protected final String cluster2 = "r2";
+ protected ServiceConfiguration config2 = new ServiceConfiguration();
+ protected ZookeeperServerTest brokerConfigZk2;
+ protected LocalBookkeeperEnsemble bkEnsemble2;
+ protected PulsarService pulsar2;
+ protected BrokerService ns2;
+ protected PulsarAdmin admin2;
+ protected PulsarClient client2;
+
+ protected void startZKAndBK() throws Exception {
+ // Start ZK.
+ brokerConfigZk1 = new ZookeeperServerTest(0);
+ brokerConfigZk1.start();
+ brokerConfigZk2 = new ZookeeperServerTest(0);
+ brokerConfigZk2.start();
+
+ // Start BK.
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble1.start();
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble2.start();
+ }
+
+ protected void startBrokers() throws Exception {
+ // Start brokers.
+ setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1);
+ pulsar1 = new PulsarService(config1);
+ pulsar1.start();
+ ns1 = pulsar1.getBrokerService();
+
+ url1 = new URL(pulsar1.getWebServiceAddress());
+ urlTls1 = new URL(pulsar1.getWebServiceAddressTls());
+ admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
+ client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
+
+ // Start region 2
+ setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2);
+ pulsar2 = new PulsarService(config2);
+ pulsar2.start();
+ ns2 = pulsar2.getBrokerService();
+
+ url2 = new URL(pulsar2.getWebServiceAddress());
+ urlTls2 = new URL(pulsar2.getWebServiceAddressTls());
+ admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
+ client2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
+ }
+
+ protected void createDefaultTenantsAndClustersAndNamespace() throws
Exception {
+ admin1.clusters().createCluster(cluster1, ClusterData.builder()
+ .serviceUrl(url1.toString())
+ .serviceUrlTls(urlTls1.toString())
+ .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin1.clusters().createCluster(cluster2, ClusterData.builder()
+ .serviceUrl(url2.toString())
+ .serviceUrlTls(urlTls2.toString())
+ .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin2.clusters().createCluster(cluster1, ClusterData.builder()
+ .serviceUrl(url1.toString())
+ .serviceUrlTls(urlTls1.toString())
+ .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin2.clusters().createCluster(cluster2, ClusterData.builder()
+ .serviceUrl(url2.toString())
+ .serviceUrlTls(urlTls2.toString())
+ .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+
+ admin1.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(cluster1, cluster2)));
+ admin2.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(cluster1, cluster2)));
+
+ admin1.namespaces().createNamespace(defaultNamespace);
+ admin2.namespaces().createNamespace(defaultNamespace);
+ }
+
+ @Override
+ protected void setup() throws Exception {
+ incrementSetupNumber();
+
+ log.info("--- Starting OneWayReplicatorTestBase::setup ---");
+
+ startZKAndBK();
+
+ startBrokers();
+
+ createDefaultTenantsAndClustersAndNamespace();
+
+ Thread.sleep(100);
+ log.info("--- OneWayReplicatorTestBase::setup completed ---");
+ }
+
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble bookkeeperEnsemble,
ZookeeperServerTest brokerConfigZk) {
+ config.setClusterName(clusterName);
+ config.setAdvertisedAddress("localhost");
+ config.setWebServicePort(Optional.of(0));
+ config.setWebServicePortTls(Optional.of(0));
+ config.setMetadataStoreUrl("zk:127.0.0.1:" +
bookkeeperEnsemble.getZookeeperPort());
+ config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" +
brokerConfigZk.getZookeeperPort() + "/foo");
+ config.setBrokerDeleteInactiveTopicsEnabled(false);
+ config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
+ config.setBrokerShutdownTimeoutMs(0L);
+ config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ config.setBrokerServicePort(Optional.of(0));
+ config.setBrokerServicePortTls(Optional.of(0));
+ config.setBacklogQuotaCheckIntervalInSeconds(5);
+ config.setDefaultNumberOfNamespaceBundles(1);
+ config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ config.setEnableReplicatedSubscriptions(true);
+ config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+ config.setLoadBalancerSheddingEnabled(false);
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ // shutdown.
+ markCurrentSetupNumberCleaned();
+ log.info("--- Shutting down ---");
+
+ // Stop brokers.
+ if (client1 != null) {
+ client1.close();
+ client1 = null;
+ }
+ if (client2 != null) {
+ client2.close();
+ client2 = null;
+ }
+ if (admin1 != null) {
+ admin1.close();
+ admin1 = null;
+ }
+ if (admin2 != null) {
+ admin2.close();
+ admin2 = null;
+ }
+ if (pulsar2 != null) {
+ pulsar2.close();
+ pulsar2 = null;
+ }
+ if (pulsar1 != null) {
+ pulsar1.close();
+ pulsar1 = null;
+ }
+
+ // Stop ZK and BK.
+ if (bkEnsemble1 != null) {
+ bkEnsemble1.stop();
+ bkEnsemble1 = null;
+ }
+ if (bkEnsemble2 != null) {
+ bkEnsemble2.stop();
+ bkEnsemble2 = null;
+ }
+ if (brokerConfigZk1 != null) {
+ brokerConfigZk1.stop();
+ brokerConfigZk1 = null;
+ }
+ if (brokerConfigZk2 != null) {
+ brokerConfigZk2.stop();
+ brokerConfigZk2 = null;
+ }
+
+ // Reset configs.
+ config1 = new ServiceConfiguration();
+ config2 = new ServiceConfiguration();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java
new file mode 100644
index 00000000000..577725f96ed
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.HashSet;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class SyncConfigStoreTest extends
GeoReplicationWithConfigurationSyncTestBase {
+
+ private static final String CONF_NAME_SYNC_EVENT_TOPIC =
"configurationMetadataSyncEventTopic";
+ private static final String SYNC_EVENT_TOPIC =
TopicDomain.persistent.value() + "://" + SYSTEM_NAMESPACE
+ + "/__sync_config_meta";
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+ TenantInfoImpl tenantInfo = new TenantInfoImpl();
+ tenantInfo.setAllowedClusters(new HashSet<>(Arrays.asList(cluster1,
cluster2)));
+
admin1.tenants().createTenant(TopicName.get(SYNC_EVENT_TOPIC).getTenant(),
tenantInfo);
+
admin1.namespaces().createNamespace(TopicName.get(SYNC_EVENT_TOPIC).getNamespace());
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+ super.setConfigDefaults(config, clusterName, bookkeeperEnsemble,
brokerConfigZk);
+ }
+
+ @Test
+ public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws
Exception {
+ // Verify the condition that supports synchronizer: the metadata store
is a different one.
+ Awaitility.await().untilAsserted(() -> {
+ boolean shouldShutdownConfigurationMetadataStore =
+ WhiteboxImpl.getInternalState(pulsar1,
"shouldShutdownConfigurationMetadataStore");
+ assertTrue(shouldShutdownConfigurationMetadataStore);
+ });
+
+ // Verify the synchronizer will be created dynamically.
+
admin1.brokers().updateDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC,
SYNC_EVENT_TOPIC);
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(pulsar1.getConfig().getConfigurationMetadataSyncEventTopic(),
SYNC_EVENT_TOPIC);
+ PulsarMetadataEventSynchronizer synchronizer =
+ WhiteboxImpl.getInternalState(pulsar1,
"configMetadataSynchronizer");
+ assertNotNull(synchronizer);
+ assertEquals(synchronizer.getState(),
PulsarMetadataEventSynchronizer.State.Started);
+ assertTrue(synchronizer.isStarted());
+ });
+
+ PulsarMetadataEventSynchronizer synchronizerStarted =
+ WhiteboxImpl.getInternalState(pulsar1,
"configMetadataSynchronizer");
+ Producer<MetadataEvent> producerStarted =
+ WhiteboxImpl.getInternalState(synchronizerStarted, "producer");
+ Consumer<MetadataEvent> consumerStarted =
+ WhiteboxImpl.getInternalState(synchronizerStarted, "consumer");
+
+ // Verify the synchronizer will be closed dynamically.
+
admin1.brokers().deleteDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC);
+ Awaitility.await().untilAsserted(() -> {
+ // The synchronizer that was started will be closed.
+ assertEquals(synchronizerStarted.getState(),
PulsarMetadataEventSynchronizer.State.Closed);
+ assertTrue(synchronizerStarted.isClosingOrClosed());
+ assertFalse(producerStarted.isConnected());
+ assertFalse(consumerStarted.isConnected());
+ // The synchronizer in memory will be null.
+
assertNull(pulsar1.getConfig().getConfigurationMetadataSyncEventTopic());
+ PulsarMetadataEventSynchronizer synchronizer =
+ WhiteboxImpl.getInternalState(pulsar1,
"configMetadataSynchronizer");
+ assertNull(synchronizer);
+ });
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
index 9a735e0f15a..cababd03246 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java
@@ -49,5 +49,5 @@ public interface MetadataEventSynchronizer {
/**
* close synchronizer resources.
*/
- void close();
+ CompletableFuture<Void> closeAsync();
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index e565ba30d3d..182c14ef601 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -84,6 +84,8 @@ public interface MetadataStoreExtended extends MetadataStore {
return Optional.empty();
}
+ default void updateMetadataEventSynchronizer(MetadataEventSynchronizer
synchronizer) {}
+
/**
* Handles a metadata synchronizer event.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 7a495f78771..3909a89cf5e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -82,8 +82,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
- synchronizer = metadataStoreConfig.getSynchronizer();
- registerSyncListener(Optional.ofNullable(synchronizer));
+ updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer());
if ("local".equals(name)) {
map = new TreeMap<>();
sequentialIdGenerator = new AtomicLong();
@@ -233,6 +232,12 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
return Optional.ofNullable(synchronizer);
}
+ @Override
+ public void updateMetadataEventSynchronizer(MetadataEventSynchronizer
synchronizer) {
+ this.synchronizer = synchronizer;
+ registerSyncListener(Optional.ofNullable(synchronizer));
+ }
+
@Override
public void close() throws Exception {
if (isClosed.compareAndSet(false, true)) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index be985129f2a..39f7edd5cee 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -112,8 +112,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
// Create a new store instance
store = new RocksdbMetadataStore(metadataStoreUri, conf);
// update synchronizer and register sync listener
- store.synchronizer = conf.getSynchronizer();
- store.registerSyncListener(Optional.ofNullable(store.synchronizer));
+ store.updateMetadataEventSynchronizer(conf.getSynchronizer());
instancesCache.put(metadataStoreUri, store);
return store;
}
@@ -572,6 +571,12 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
return Optional.ofNullable(synchronizer);
}
+
+ @Override
+ public void updateMetadataEventSynchronizer(MetadataEventSynchronizer
synchronizer) {
+ this.synchronizer = synchronizer;
+ registerSyncListener(Optional.ofNullable(synchronizer));
+ }
}
class RocksdbMetadataStoreProvider implements MetadataStoreProvider {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index a164e4c2460..5b45530d2e2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -52,7 +52,7 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private final int maxDelayMillis;
private final int maxOperations;
private final int maxSize;
- private final MetadataEventSynchronizer synchronizer;
+ private MetadataEventSynchronizer synchronizer;
private final BatchMetadataStoreStats batchMetadataStoreStats;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
@@ -75,8 +75,7 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
}
// update synchronizer and register sync listener
- synchronizer = conf.getSynchronizer();
- registerSyncListener(Optional.ofNullable(synchronizer));
+ updateMetadataEventSynchronizer(conf.getSynchronizer());
this.batchMetadataStoreStats =
new BatchMetadataStoreStats(metadataStoreName, executor);
}
@@ -161,6 +160,12 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
return Optional.ofNullable(synchronizer);
}
+ @Override
+ public void updateMetadataEventSynchronizer(MetadataEventSynchronizer
synchronizer) {
+ this.synchronizer = synchronizer;
+ registerSyncListener(Optional.ofNullable(synchronizer));
+ }
+
private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op)
{
if (enabled) {
if (!queue.offer(op)) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 728bc1175b9..f85e3d2dc75 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -55,7 +55,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
private final AsyncOxiaClient client;
private final String identity;
- private final Optional<MetadataEventSynchronizer> synchronizer;
+ private Optional<MetadataEventSynchronizer> synchronizer;
OxiaMetadataStore(
@NonNull String serviceAddress,
@@ -69,7 +69,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
if (!metadataStoreConfig.isBatchingEnabled()) {
linger = 0;
}
- this.synchronizer =
Optional.ofNullable(metadataStoreConfig.getSynchronizer());
+ updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer());
identity = UUID.randomUUID().toString();
client =
OxiaClientBuilder.create(serviceAddress)
@@ -286,5 +286,11 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
return synchronizer;
}
+ @Override
+ public void updateMetadataEventSynchronizer(MetadataEventSynchronizer
synchronizer) {
+ this.synchronizer = Optional.ofNullable(synchronizer);
+ registerSyncListener(this.synchronizer);
+ }
+
private record PathWithPutResult(String path, PutResult result) {}
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
index 3fabe9647eb..caca16ff538 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java
@@ -206,8 +206,8 @@ public class LocalMemoryMetadataStoreTest {
}
@Override
- public void close() {
- // No-op
+ public CompletableFuture<Void> closeAsync() {
+ return CompletableFuture.completedFuture(null);
}
}