This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new dc1ce081420 Ensure cache is refreshed (and not just invalidated) after
a store write (#12788)
dc1ce081420 is described below
commit dc1ce0814208993f821ddb96e38a9ef83a425e0d
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Nov 16 16:15:07 2021 -0800
Ensure cache is refreshed (and not just invalidated) after a store write
(#12788)
### Motivation
When we're doing a write to the store from outside the `MetadataCache`, we
are immediately invalidating the cache to ensure read-after-write consistency
through the cache.
The only issue is that the invalidation, will not trigger a reloading of
the value. Instead it is relying on the next call to `cache.get()` which will
see the cache miss and it will load the new value into the cache.
This means that calls `cache.getIfCached()`, which is not triggering a
cache load, will keep seeing the key as missing.
### Modification
Ensure we're calling refresh on the cache to get the value automatically
reloaded in background and make sure the `getIfCached()` will eventually return
the new value, even if there are no calls to `cache.get()`.
(cherry picked from commit 2bc449933f72f28dfae24ca8a6fb022be7c55a44)
---
.../apache/pulsar/metadata/api/MetadataCache.java | 7 +++++
.../metadata/cache/impl/MetadataCacheImpl.java | 17 ++++++-----
.../metadata/impl/AbstractMetadataStore.java | 2 +-
.../apache/pulsar/metadata/MetadataCacheTest.java | 34 ++++++++++++++++++++--
4 files changed, 50 insertions(+), 10 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 360c092b6f1..1272130eb76 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -148,4 +148,11 @@ public interface MetadataCache<T> {
* @param path the path of the object in the metadata store
*/
void invalidate(String path);
+
+ /**
+ * Invalidate and reload an object in the metadata cache.
+ *
+ * @param path the path of the object in the metadata store
+ */
+ void refresh(String path);
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 16b419fe9ed..0ab56bb1bc1 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -168,8 +168,7 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
}
return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
+ refresh(path);
}).thenApply(__ -> newValueObj);
}), path);
}
@@ -198,8 +197,7 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
}
return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
+ refresh(path);
}).thenApply(__ -> newValueObj);
}), path);
}
@@ -220,7 +218,7 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
// In addition to caching the value, we need to add a
watch on the path,
// so when/if it changes on any other node, we are
notified and we can
// update the cache
- objCache.get(path).whenComplete( (stat2, ex) -> {
+ objCache.get(path).whenComplete((stat2, ex) -> {
if (ex == null) {
future.complete(null);
} else {
@@ -261,6 +259,12 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
objCache.synchronous().invalidate(path);
}
+ @Override
+ public void refresh(String path) {
+ objCache.synchronous().invalidate(path);
+ objCache.synchronous().refresh(path);
+ }
+
@VisibleForTesting
public void invalidateAll() {
objCache.synchronous().invalidateAll();
@@ -275,8 +279,7 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
if (objCache.synchronous().getIfPresent(path) != null) {
// Trigger background refresh of the cached item, but before
make sure
// to invalidate the entry so that we won't serve a stale
cached version
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
+ refresh(path);
}
break;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 40c209f4d3e..ac7feb4747f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -252,7 +252,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
- metadataCaches.forEach(c -> c.invalidate(path));
+ metadataCaches.forEach(c -> c.refresh(path));
return stat;
});
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index 322c9bb779b..e7680296563 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -278,6 +278,34 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
assertEquals(objCache.get(key1).join(), Optional.empty());
}
+ @Test(dataProvider = "impl")
+ public void insertionWithInvalidation(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataCache<MyClass> objCache =
store.getMetadataCache(MyClass.class);
+
+ String key1 = newKey();
+
+ assertEquals(objCache.getIfCached(key1), Optional.empty());
+ assertEquals(objCache.get(key1).join(), Optional.empty());
+
+ MyClass value1 = new MyClass("a", 1);
+ store.put(key1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1),
Optional.of(-1L)).join();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(objCache.getIfCached(key1), Optional.of(value1));
+ assertEquals(objCache.get(key1).join(), Optional.of(value1));
+ });
+
+ MyClass value2 = new MyClass("a", 2);
+ store.put(key1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value2),
Optional.of(0L)).join();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(objCache.getIfCached(key1), Optional.of(value2));
+ assertEquals(objCache.get(key1).join(), Optional.of(value2));
+ });
+ }
+
@Test(dataProvider = "impl")
public void insertionOutsideCache(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
@@ -310,8 +338,10 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
v.put("b", "2");
store.put(key1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(v),
Optional.of(-1L)).join();
- assertEquals(objCache.getIfCached(key1), Optional.empty());
- assertEquals(objCache.get(key1).join(), Optional.of(v));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(objCache.getIfCached(key1), Optional.of(v));
+ assertEquals(objCache.get(key1).join(), Optional.of(v));
+ });
}
@Test(dataProvider = "impl")