This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new dc1ce081420 Ensure cache is refreshed (and not just invalidated) after 
a store write (#12788)
dc1ce081420 is described below

commit dc1ce0814208993f821ddb96e38a9ef83a425e0d
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Nov 16 16:15:07 2021 -0800

    Ensure cache is refreshed (and not just invalidated) after a store write 
(#12788)
    
    ### Motivation
    
    When we're doing a write to the store from outside the `MetadataCache`, we 
are immediately invalidating the cache to ensure read-after-write consistency 
through the cache.
    
    The only issue is that the invalidation, will not trigger a reloading of 
the value. Instead it is relying on the next call to `cache.get()` which will 
see the cache miss and it will load the new value into the cache.
    
    This means that calls `cache.getIfCached()`, which is not triggering a 
cache load, will keep seeing the key as missing.
    
    ### Modification
    
    Ensure we're calling refresh on the cache to get the value automatically 
reloaded in background and make sure the `getIfCached()` will eventually return 
the new value, even if there are no calls to `cache.get()`.
    
    (cherry picked from commit 2bc449933f72f28dfae24ca8a6fb022be7c55a44)
---
 .../apache/pulsar/metadata/api/MetadataCache.java  |  7 +++++
 .../metadata/cache/impl/MetadataCacheImpl.java     | 17 ++++++-----
 .../metadata/impl/AbstractMetadataStore.java       |  2 +-
 .../apache/pulsar/metadata/MetadataCacheTest.java  | 34 ++++++++++++++++++++--
 4 files changed, 50 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 360c092b6f1..1272130eb76 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -148,4 +148,11 @@ public interface MetadataCache<T> {
      * @param path the path of the object in the metadata store
      */
     void invalidate(String path);
+
+    /**
+     * Invalidate and reload an object in the metadata cache.
+     *
+     * @param path the path of the object in the metadata store
+     */
+    void refresh(String path);
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 16b419fe9ed..0ab56bb1bc1 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -168,8 +168,7 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     }
 
                     return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
-                        objCache.synchronous().invalidate(path);
-                        objCache.synchronous().refresh(path);
+                        refresh(path);
                     }).thenApply(__ -> newValueObj);
                 }), path);
     }
@@ -198,8 +197,7 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     }
 
                     return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
-                        objCache.synchronous().invalidate(path);
-                        objCache.synchronous().refresh(path);
+                        refresh(path);
                     }).thenApply(__ -> newValueObj);
                 }), path);
     }
@@ -220,7 +218,7 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     // In addition to caching the value, we need to add a 
watch on the path,
                     // so when/if it changes on any other node, we are 
notified and we can
                     // update the cache
-                    objCache.get(path).whenComplete( (stat2, ex) -> {
+                    objCache.get(path).whenComplete((stat2, ex) -> {
                         if (ex == null) {
                             future.complete(null);
                         } else {
@@ -261,6 +259,12 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         objCache.synchronous().invalidate(path);
     }
 
+    @Override
+    public void refresh(String path) {
+        objCache.synchronous().invalidate(path);
+        objCache.synchronous().refresh(path);
+    }
+
     @VisibleForTesting
     public void invalidateAll() {
         objCache.synchronous().invalidateAll();
@@ -275,8 +279,7 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
             if (objCache.synchronous().getIfPresent(path) != null) {
                 // Trigger background refresh of the cached item, but before 
make sure
                 // to invalidate the entry so that we won't serve a stale 
cached version
-                objCache.synchronous().invalidate(path);
-                objCache.synchronous().refresh(path);
+                refresh(path);
             }
             break;
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 40c209f4d3e..ac7feb4747f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -252,7 +252,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                         }
                     }
 
-                    metadataCaches.forEach(c -> c.invalidate(path));
+                    metadataCaches.forEach(c -> c.refresh(path));
                     return stat;
                 });
     }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index 322c9bb779b..e7680296563 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -278,6 +278,34 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         assertEquals(objCache.get(key1).join(), Optional.empty());
     }
 
+    @Test(dataProvider = "impl")
+    public void insertionWithInvalidation(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
+        MetadataCache<MyClass> objCache = 
store.getMetadataCache(MyClass.class);
+
+        String key1 = newKey();
+
+        assertEquals(objCache.getIfCached(key1), Optional.empty());
+        assertEquals(objCache.get(key1).join(), Optional.empty());
+
+        MyClass value1 = new MyClass("a", 1);
+        store.put(key1, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1), 
Optional.of(-1L)).join();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(objCache.getIfCached(key1), Optional.of(value1));
+            assertEquals(objCache.get(key1).join(), Optional.of(value1));
+        });
+
+        MyClass value2 = new MyClass("a", 2);
+        store.put(key1, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value2), 
Optional.of(0L)).join();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(objCache.getIfCached(key1), Optional.of(value2));
+            assertEquals(objCache.get(key1).join(), Optional.of(value2));
+        });
+    }
+
     @Test(dataProvider = "impl")
     public void insertionOutsideCache(String provider, Supplier<String> 
urlSupplier) throws Exception {
         @Cleanup
@@ -310,8 +338,10 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         v.put("b", "2");
         store.put(key1, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(v), 
Optional.of(-1L)).join();
 
-        assertEquals(objCache.getIfCached(key1), Optional.empty());
-        assertEquals(objCache.get(key1).join(), Optional.of(v));
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(objCache.getIfCached(key1), Optional.of(v));
+            assertEquals(objCache.get(key1).join(), Optional.of(v));
+        });
     }
 
     @Test(dataProvider = "impl")

Reply via email to