This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 38bba54 Unit test to verify metadata cache consistency across
brokers. (#11202)
38bba54 is described below
commit 38bba54a167ed3fba419bc2b7da36ce41e2ece21
Author: Bharani Chadalavada <[email protected]>
AuthorDate: Mon Jul 26 15:47:53 2021 -0700
Unit test to verify metadata cache consistency across brokers. (#11202)
Co-authored-by: Bharani Chadalavada <[email protected]>
---
.../apache/pulsar/metadata/MetadataCacheTest.java | 53 +++++++++++++++++++++-
1 file changed, 52 insertions(+), 1 deletion(-)
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index f2c25c4..68f772c 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -33,13 +33,16 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.CacheGetResult;
@@ -51,11 +54,14 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsExcept
import
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+@Slf4j
public class MetadataCacheTest extends BaseMetadataStoreTest {
@Data
@@ -102,7 +108,7 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
}
@Test(dataProvider = "zk")
- public void crossStoreUpdates(String provider, String url) throws
Exception {
+ public void crossStoreAddDelete(String provider, String url) throws
Exception {
@Cleanup
MetadataStore store1 = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
@@ -165,6 +171,51 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
});
}
+ @Test(dataProvider = "zk")
+ public void crossStoreUpdates(String provider, String url) throws
Exception {
+ String testName = "cross store updates";
+ @Cleanup
+ MetadataStore store1 = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ MetadataStore store2 = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ MetadataCacheImpl<MyClass> objCache1 = (MetadataCacheImpl<MyClass>)
store1.getMetadataCache(MyClass.class);
+
+ MetadataCacheImpl<MyClass> objCache2 = (MetadataCacheImpl<MyClass>)
store2.getMetadataCache(MyClass.class);
+ AtomicReference<MyClass> storeObj = new AtomicReference<MyClass>();
+ store2.registerListener(n -> {
+ if (n.getType() == NotificationType.Modified) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ MyClass obj = objCache2.get(n.getPath()).get().get();
+ storeObj.set(obj);
+ } catch (Exception e) {
+ log.error("Got exception {}", e.getMessage());
+ }
+ });
+ };
+ });
+
+ String key1 = "/test-key1";
+ assertEquals(objCache1.getIfCached(key1), Optional.empty());
+ assertEquals(objCache2.getIfCached(key1), Optional.empty());
+
+ MyClass value1 = new MyClass(testName, 1);
+ objCache1.create(key1, value1).join();
+
+ Awaitility.await().ignoreNoExceptions().untilAsserted(() -> {
+ assertEquals(objCache1.getIfCached(key1), Optional.of(value1));
+ assertEquals(objCache2.get(key1).join(), Optional.of(value1));
+ assertEquals(objCache2.getIfCached(key1), Optional.of(value1));
+ });
+
+ MyClass value2 = new MyClass(testName, 2);
+ objCache1.readModifyUpdate(key1, (oldData) -> {return value2;}).join();
+
+ Awaitility.await().ignoreNoExceptions().untilAsserted(()
->assertEquals(storeObj.get(), value2));
+ }
+
@Test(dataProvider = "impl")
public void insertionDeletionWitGenericType(String provider, String url)
throws Exception {
@Cleanup