This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5d3099d50dc7662936592486d0fffa70ee642df2 Author: Enrico Olivelli <[email protected]> AuthorDate: Tue Dec 27 11:55:17 2022 +0100 [improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518) (cherry picked from commit 492a9c3e44bef2334a77164afc8b033cc8f8d82f) --- .../apache/pulsar/broker/resources/BaseResources.java | 7 +++++++ .../pulsar/broker/resources/NamespaceResources.java | 14 ++++++++++++-- .../org/apache/pulsar/metadata/api/MetadataStore.java | 11 +++++++++++ .../apache/pulsar/metadata/impl/ZKMetadataStore.java | 18 ++++++++++++++++++ 4 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 31409e37890..5ae87d2c644 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -93,6 +93,13 @@ public class BaseResources<T> { return cache.get(path); } + protected CompletableFuture<Optional<T>> refreshAndGetAsync(String path) { + return store.sync(path).thenCompose(___ -> { + cache.invalidate(path); + return cache.get(path); + }); + } + protected void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException { try { setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 327e6a784dd..7926677e5d2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -274,8 +274,18 @@ public class NamespaceResources extends BaseResources<Policies> { } public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn) { - return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), - tn.getEncodedLocalName())); + return getPartitionedTopicMetadataAsync(tn, false); + } + + public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn, + boolean refresh) { + if (refresh) { + return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), + tn.getEncodedLocalName())); + } else { + return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), + tn.getEncodedLocalName())); + } } public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index b4295f25867..79670e859d6 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable { */ CompletableFuture<Optional<GetResult>> get(String path); + + /** + * Ensure that the next value read from the local client will be up-to-date with the latest version of the value + * as it can be seen by all the other clients. + * @param path + * @return a handle to the operation + */ + default CompletableFuture<Void> sync(String path) { + return CompletableFuture.completedFuture(null); + } + /** * Return all the nodes (lexicographically sorted) that are children to the specific path. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index f6c6db68f50..2dbe0a7c3f3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -162,6 +162,24 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore } } + @Override + public CompletableFuture<Void> sync(String path) { + CompletableFuture<Void> result = new CompletableFuture<>(); + zkc.sync(path, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String s, Object o) { + Code code = Code.get(rc); + if (code == Code.OK) { + result.complete(null); + } else { + MetadataStoreException e = getException(code, path); + result.completeExceptionally(e); + } + } + }, null); + return result; + } + @Override protected void batchOperation(List<MetadataOp> ops) { try {
