This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d3099d50dc7662936592486d0fffa70ee642df2
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue Dec 27 11:55:17 2022 +0100

    [improve] Introduce the sync() API to ensure consistency on reads during 
critical metadata operation paths (#18518)
    
    (cherry picked from commit 492a9c3e44bef2334a77164afc8b033cc8f8d82f)
---
 .../apache/pulsar/broker/resources/BaseResources.java  |  7 +++++++
 .../pulsar/broker/resources/NamespaceResources.java    | 14 ++++++++++++--
 .../org/apache/pulsar/metadata/api/MetadataStore.java  | 11 +++++++++++
 .../apache/pulsar/metadata/impl/ZKMetadataStore.java   | 18 ++++++++++++++++++
 4 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 31409e37890..5ae87d2c644 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -93,6 +93,13 @@ public class BaseResources<T> {
         return cache.get(path);
     }
 
+    protected CompletableFuture<Optional<T>> refreshAndGetAsync(String path) {
+        return store.sync(path).thenCompose(___ -> {
+            cache.invalidate(path);
+            return cache.get(path);
+        });
+    }
+
     protected void set(String path, Function<T, T> modifyFunction) throws 
MetadataStoreException {
         try {
             setAsync(path, modifyFunction).get(operationTimeoutSec, 
TimeUnit.SECONDS);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 327e6a784dd..7926677e5d2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -274,8 +274,18 @@ public class NamespaceResources extends 
BaseResources<Policies> {
         }
 
         public CompletableFuture<Optional<PartitionedTopicMetadata>> 
getPartitionedTopicMetadataAsync(TopicName tn) {
-            return getAsync(joinPath(PARTITIONED_TOPIC_PATH, 
tn.getNamespace(), tn.getDomain().value(),
-                    tn.getEncodedLocalName()));
+            return getPartitionedTopicMetadataAsync(tn, false);
+        }
+
+        public CompletableFuture<Optional<PartitionedTopicMetadata>> 
getPartitionedTopicMetadataAsync(TopicName tn,
+                                                                               
                       boolean refresh) {
+            if (refresh) {
+                return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, 
tn.getNamespace(), tn.getDomain().value(),
+                        tn.getEncodedLocalName()));
+            } else {
+                return getAsync(joinPath(PARTITIONED_TOPIC_PATH, 
tn.getNamespace(), tn.getDomain().value(),
+                        tn.getEncodedLocalName()));
+            }
         }
 
         public boolean partitionedTopicExists(TopicName tn) throws 
MetadataStoreException {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index b4295f25867..79670e859d6 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable {
      */
     CompletableFuture<Optional<GetResult>> get(String path);
 
+
+    /**
+     * Ensure that the next value read from  the local client will be 
up-to-date with the latest version of the value
+     * as it can be seen by all the other clients.
+     * @param path
+     * @return a handle to the operation
+     */
+    default CompletableFuture<Void> sync(String path) {
+        return CompletableFuture.completedFuture(null);
+    }
+
     /**
      * Return all the nodes (lexicographically sorted) that are children to 
the specific path.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index f6c6db68f50..2dbe0a7c3f3 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -162,6 +162,24 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
         }
     }
 
+    @Override
+    public CompletableFuture<Void> sync(String path) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        zkc.sync(path, new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String s, Object o) {
+                Code code = Code.get(rc);
+                if (code == Code.OK) {
+                    result.complete(null);
+                } else {
+                    MetadataStoreException e = getException(code, path);
+                    result.completeExceptionally(e);
+                }
+            }
+        }, null);
+        return result;
+    }
+
     @Override
     protected void batchOperation(List<MetadataOp> ops) {
         try {

Reply via email to