This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 13c19b50216 [improve][broker] Register the broker to metadata store
without version id compare (#23298)
13c19b50216 is described below
commit 13c19b50216ba7e73766e6fa7b57d2700614e3b5
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 13 19:07:12 2024 +0800
[improve][broker] Register the broker to metadata store without version id
compare (#23298)
---
.../loadbalance/extensions/BrokerRegistryImpl.java | 44 +++++++++++-----------
.../loadbalance/extensions/BrokerRegistryTest.java | 14 +++----
.../apache/pulsar/metadata/api/MetadataCache.java | 20 ++++++++++
.../metadata/cache/impl/MetadataCacheImpl.java | 24 ++++++++++++
.../apache/pulsar/metadata/MetadataCacheTest.java | 25 ++++++++++++
5 files changed, 96 insertions(+), 31 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 5db11d40c33..f34d377990b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -21,11 +21,11 @@ package org.apache.pulsar.broker.loadbalance.extensions;
import static
org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
@@ -39,11 +39,11 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
-import org.apache.pulsar.metadata.api.coordination.LockManager;
-import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
/**
* The broker registry impl, base on the LockManager.
@@ -57,16 +57,14 @@ public class BrokerRegistryImpl implements BrokerRegistry {
private final BrokerLookupData brokerLookupData;
- private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+ private final MetadataCache<BrokerLookupData>
brokerLookupDataMetadataCache;
- private final String brokerId;
+ private final String brokerIdKeyPath;
private final ScheduledExecutorService scheduler;
private final List<BiConsumer<String, NotificationType>> listeners;
- private volatile ResourceLock<BrokerLookupData> brokerLookupDataLock;
-
protected enum State {
Init,
Started,
@@ -79,10 +77,10 @@ public class BrokerRegistryImpl implements BrokerRegistry {
public BrokerRegistryImpl(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
- this.brokerLookupDataLockManager =
pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+ this.brokerLookupDataMetadataCache =
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
- this.brokerId = pulsar.getBrokerId();
+ this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
this.brokerLookupData = new BrokerLookupData(
pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
@@ -122,7 +120,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
public synchronized void register() throws MetadataStoreException {
if (this.state == State.Started) {
try {
- this.brokerLookupDataLock =
brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData)
+ brokerLookupDataMetadataCache.put(brokerIdKeyPath,
brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.get(conf.getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
this.state = State.Registered;
} catch (InterruptedException | ExecutionException |
TimeoutException e) {
@@ -135,30 +133,37 @@ public class BrokerRegistryImpl implements BrokerRegistry
{
public synchronized void unregister() throws MetadataStoreException {
if (this.state == State.Registered) {
try {
- this.brokerLookupDataLock.release()
+ brokerLookupDataMetadataCache.delete(brokerIdKeyPath)
.get(conf.getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
- this.state = State.Started;
- } catch (CompletionException | InterruptedException |
ExecutionException | TimeoutException e) {
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof
MetadataStoreException.NotFoundException) {
+ log.warn("{} has already been unregistered",
brokerIdKeyPath);
+ } else {
+ throw MetadataStoreException.unwrap(e);
+ }
+ } catch (InterruptedException | TimeoutException e) {
throw MetadataStoreException.unwrap(e);
+ } finally {
+ this.state = State.Started;
}
}
}
@Override
public String getBrokerId() {
- return this.brokerId;
+ return pulsar.getBrokerId();
}
@Override
public CompletableFuture<List<String>> getAvailableBrokersAsync() {
this.checkState();
- return
brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenApply(ArrayList::new);
+ return
brokerLookupDataMetadataCache.getChildren(LOADBALANCE_BROKERS_ROOT);
}
@Override
public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String
broker) {
this.checkState();
- return brokerLookupDataLockManager.readLock(keyPath(broker));
+ return brokerLookupDataMetadataCache.get(keyPath(broker));
}
public CompletableFuture<Map<String, BrokerLookupData>>
getAvailableBrokerLookupDataAsync() {
@@ -192,13 +197,8 @@ public class BrokerRegistryImpl implements BrokerRegistry {
try {
this.listeners.clear();
this.unregister();
- this.brokerLookupDataLockManager.close();
} catch (Exception ex) {
- if (ex.getCause() instanceof
MetadataStoreException.NotFoundException) {
- throw new
PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
- } else {
- throw new
PulsarServerException(MetadataStoreException.unwrap(ex));
- }
+ log.error("Unexpected error when unregistering the broker
registry", ex);
} finally {
this.state = State.Closed;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 42600a42035..91ada90dda6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -291,7 +291,7 @@ public class BrokerRegistryTest {
}
@Test
- public void testRegisterFailWithSameBrokerId() throws Exception {
+ public void testRegisterWithSameBrokerId() throws Exception {
PulsarService pulsar1 = createPulsarService();
PulsarService pulsar2 = createPulsarService();
pulsar1.start();
@@ -301,14 +301,10 @@ public class BrokerRegistryTest {
BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
brokerRegistry1.start();
- try {
- brokerRegistry2.start();
- fail();
- } catch (Exception ex) {
- log.info("Broker registry start failed.", ex);
- assertTrue(ex instanceof PulsarServerException);
- assertTrue(ex.getMessage().contains("LockBusyException"));
- }
+ brokerRegistry2.start();
+
+ pulsar1.close();
+ pulsar2.close();
}
@Test
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 6d558e70971..8e153b23d30 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.metadata.api;
+import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
/**
* Represent the caching layer access for a specific type of objects.
@@ -128,6 +130,24 @@ public interface MetadataCache<T> {
*/
CompletableFuture<Void> create(String path, T value);
+ /**
+ * Create or update the value of the given path in the metadata store
without version comparison.
+ * <p>
+ * This method is equivalent to
+ * {@link
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended#put(String,
byte[], Optional, EnumSet)} or
+ * {@link MetadataStore#put(String, byte[], Optional)} if the metadata
store does not support this extended API,
+ * with `Optional.empty()` as the 3rd argument. It means if the path does
not exist, it will be created. If the path
+ * already exists, the new value will override the old value.
+ * </p>
+ * @param path the path of the object in the metadata store
+ * @param value the object to put in the metadata store
+ * @param options the create options if the path does not in the metadata
store
+ * @return the future that indicates if this operation failed, it could
fail with
+ * {@link java.io.IOException} if the value failed to be serialized
+ * {@link MetadataStoreException} if the metadata store operation failed
+ */
+ CompletableFuture<Void> put(String path, T value, EnumSet<CreateOption>
options);
+
/**
* Delete an object from the metadata store.
* <p>
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index b9051a7dc7d..ee394b0267c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -25,6 +25,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -47,12 +48,15 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException
import
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@Slf4j
public class MetadataCacheImpl<T> implements MetadataCache<T>,
Consumer<Notification> {
@Getter
private final MetadataStore store;
+ private final MetadataStoreExtended storeExtended;
private final MetadataSerde<T> serde;
private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>>
objCache;
@@ -67,6 +71,11 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde,
MetadataCacheConfig cacheConfig) {
this.store = store;
+ if (store instanceof MetadataStoreExtended) {
+ this.storeExtended = (MetadataStoreExtended) store;
+ } else {
+ this.storeExtended = null;
+ }
this.serde = serde;
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
@@ -243,6 +252,21 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return future;
}
+ @Override
+ public CompletableFuture<Void> put(String path, T value,
EnumSet<CreateOption> options) {
+ final byte[] bytes;
+ try {
+ bytes = serde.serialize(path, value);
+ } catch (IOException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ if (storeExtended != null) {
+ return storeExtended.put(path, bytes, Optional.empty(),
options).thenAccept(__ -> refresh(path));
+ } else {
+ return store.put(path, bytes, Optional.empty()).thenAccept(__ ->
refresh(path));
+ }
+ }
+
@Override
public CompletableFuture<Void> delete(String path) {
return store.delete(path, Optional.empty());
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index df59d25bdcc..bac58073604 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -55,6 +56,7 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
@@ -597,4 +599,27 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
assertEquals(res.getValue().b, 2);
assertEquals(res.getValue().path, key1);
}
+
+ @Test(dataProvider = "distributedImpl")
+ public void testPut(String provider, Supplier<String> urlSupplier) throws
Exception {
+ @Cleanup final var store1 =
MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
+ .build());
+ final var cache1 = store1.getMetadataCache(Integer.class);
+ @Cleanup final var store2 =
MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
+ .build());
+ final var cache2 = store2.getMetadataCache(Integer.class);
+ final var key = "/testPut";
+
+ cache1.put(key, 1, EnumSet.of(CreateOption.Ephemeral)); // create
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cache1.get(key).get().orElse(-1), 1);
+ assertEquals(cache2.get(key).get().orElse(-1), 1);
+ });
+
+ cache2.put(key, 2, EnumSet.of(CreateOption.Ephemeral)); // update
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cache1.get(key).get().orElse(-1), 2);
+ assertEquals(cache2.get(key).get().orElse(-1), 2);
+ });
+ }
}