This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 7335336cc3e [improve][meta]Allow version to start positive and grow by 
more than one (#19503)
7335336cc3e is described below

commit 7335336cc3e11e566bce94668c82788f81183c25
Author: Andras Beni <[email protected]>
AuthorDate: Mon Feb 27 23:09:00 2023 +0100

    [improve][meta]Allow version to start positive and grow by more than one 
(#19503)
---
 .../java/org/apache/pulsar/metadata/api/Stat.java  | 12 ++++++++++++
 .../metadata/impl/AbstractMetadataStore.java       |  2 +-
 .../apache/pulsar/metadata/MetadataCacheTest.java  |  7 +++++--
 .../pulsar/metadata/MetadataStoreBatchingTest.java |  7 +++++--
 .../pulsar/metadata/MetadataStoreExtendedTest.java |  6 ++++--
 .../apache/pulsar/metadata/MetadataStoreTest.java  | 22 ++++++++++++++--------
 6 files changed, 41 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
index 5fd6a3b0c6e..b9051df02e3 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
@@ -19,13 +19,20 @@
 package org.apache.pulsar.metadata.api;
 
 import lombok.Data;
+import lombok.RequiredArgsConstructor;
 
 /**
  * Represent the information associated with a given value in the store.
  */
 @Data
+@RequiredArgsConstructor
 public class Stat {
 
+    public Stat(String path, long version, long creationTimestamp, long 
modificationTimestamp, boolean ephemeral,
+                boolean createdBySelf) {
+        this(path, version, creationTimestamp, modificationTimestamp, 
ephemeral, createdBySelf, version == 0);
+    }
+
     /**
      * The path of the value.
      */
@@ -55,4 +62,9 @@ public class Stat {
      * Whether the key-value pair had been created within the current 
"session".
      */
     final boolean createdBySelf;
+
+    /**
+     * Whether this is the first version of the key-value pair since it has 
been last created.
+     */
+    final boolean firstVersion;
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 4b9ff914fcf..f74c624a10d 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -387,7 +387,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         return storePut(path, data, optExpectedVersion,
                 (options != null && !options.isEmpty()) ? 
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class))
                 .thenApply(stat -> {
-                    NotificationType type = stat.getVersion() == 0 ? 
NotificationType.Created
+                    NotificationType type = stat.isFirstVersion() ? 
NotificationType.Created
                             : NotificationType.Modified;
                     if (type == NotificationType.Created) {
                         existsCache.synchronous().invalidate(path);
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index 43af3ad757e..e09645ce79f 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -289,7 +289,9 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         assertEquals(objCache.get(key1).join(), Optional.empty());
 
         MyClass value1 = new MyClass("a", 1);
-        store.put(key1, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1), 
Optional.of(-1L)).join();
+        Stat putResult = store.put(key1, 
ObjectMapperFactory.getThreadLocal().writer().writeValueAsBytes(value1),
+                Optional.of(-1L)).join();
+        assertTrue(putResult.isFirstVersion());
 
         Awaitility.await().untilAsserted(() -> {
             assertEquals(objCache.getIfCached(key1), Optional.of(value1));
@@ -297,7 +299,8 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         });
 
         MyClass value2 = new MyClass("a", 2);
-        store.put(key1, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value2), 
Optional.of(0L)).join();
+        store.put(key1, 
ObjectMapperFactory.getThreadLocal().writer().writeValueAsBytes(value2),
+                Optional.of(putResult.getVersion())).join();
 
         Awaitility.await().untilAsserted(() -> {
             assertEquals(objCache.getIfCached(key1), Optional.of(value2));
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
index 8f159e6a91e..0f59ca7a250 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
@@ -104,7 +104,8 @@ public class MetadataStoreBatchingTest extends 
BaseMetadataStoreTest {
         CompletableFuture<Stat> f3 = store.put(key1 + "/c", new byte[0], 
Optional.of(-1L)); // Should succeed
         CompletableFuture<Void> f4 = store.delete(key1 + "/d", 
Optional.empty()); // Should fail
 
-        assertEquals(f1.join().getVersion(), 0L);
+        assertTrue(f1.join().getVersion() >= 0L);
+        assertTrue(f1.join().isFirstVersion());
 
         try {
             f2.join();
@@ -112,7 +113,9 @@ public class MetadataStoreBatchingTest extends 
BaseMetadataStoreTest {
             assertEquals(ce.getCause().getClass(), BadVersionException.class);
         }
 
-        assertEquals(f3.join().getVersion(), 0L);
+        assertTrue(f3.join().getVersion() >= 0L);
+        assertTrue(f3.join().isFirstVersion());
+
 
         try {
             f4.join();
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
index 14cd5947b18..1de29916c1e 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
@@ -45,7 +45,8 @@ public class MetadataStoreExtendedTest extends 
BaseMetadataStoreTest {
         Stat stat1 = store.put(basePath, "value-1".getBytes(), 
Optional.of(-1L), EnumSet.of(CreateOption.Sequential))
                 .join();
         assertNotNull(stat1);
-        assertEquals(stat1.getVersion(), 0L);
+        assertTrue(stat1.getVersion() >= 0L);
+        assertTrue(stat1.isFirstVersion());
         assertNotEquals(stat1.getPath(), basePath);
         assertEquals(store.get(stat1.getPath()).join().get().getValue(), 
"value-1".getBytes());
         String seq1 = stat1.getPath().replace(basePath, "");
@@ -54,7 +55,8 @@ public class MetadataStoreExtendedTest extends 
BaseMetadataStoreTest {
         Stat stat2 = store.put(basePath, "value-2".getBytes(), 
Optional.of(-1L), EnumSet.of(CreateOption.Sequential))
                 .join();
         assertNotNull(stat2);
-        assertEquals(stat2.getVersion(), 0L);
+        assertTrue(stat2.getVersion() >= 0L);
+        assertTrue(stat2.isFirstVersion());
         assertNotEquals(stat2.getPath(), basePath);
         assertNotEquals(stat2.getPath(), stat1.getPath());
         assertEquals(store.get(stat2.getPath()).join().get().getValue(), 
"value-2".getBytes());
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 9e0c6887d94..109f0bd05ec 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -127,13 +127,16 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
             assertException(e, BadVersionException.class);
         }
 
-        store.put(key1, "value-1".getBytes(), Optional.of(-1L)).join();
+        var putRes = store.put(key1, "value-1".getBytes(), 
Optional.of(-1L)).join();
+        long putVersion = putRes.getVersion();
+        assertTrue(putVersion >= 0);
+        assertTrue(putRes.isFirstVersion());
 
         assertTrue(store.exists(key1).join());
         Optional<GetResult> optRes = store.get(key1).join();
         assertTrue(optRes.isPresent());
         assertEquals(optRes.get().getValue(), "value-1".getBytes());
-        assertEquals(optRes.get().getStat().getVersion(), 0);
+        assertEquals(optRes.get().getStat().getVersion(), putVersion);
 
         try {
             store.put(key1, "value-2".getBytes(), Optional.of(-1L)).join();
@@ -143,7 +146,7 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         }
 
         try {
-            store.put(key1, "value-2".getBytes(), Optional.of(1L)).join();
+            store.put(key1, "value-2".getBytes(), Optional.of(putVersion + 
1)).join();
             fail("Should have failed");
         } catch (CompletionException e) {
             assertException(e, BadVersionException.class);
@@ -153,15 +156,16 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         optRes = store.get(key1).join();
         assertTrue(optRes.isPresent());
         assertEquals(optRes.get().getValue(), "value-1".getBytes());
-        assertEquals(optRes.get().getStat().getVersion(), 0);
+        assertEquals(optRes.get().getStat().getVersion(), putVersion);
 
-        store.put(key1, "value-2".getBytes(), Optional.of(0L)).join();
+        putRes = store.put(key1, "value-2".getBytes(), 
Optional.of(putVersion)).join();
+        assertTrue(putRes.getVersion() > putVersion);
 
         assertTrue(store.exists(key1).join());
         optRes = store.get(key1).join();
         assertTrue(optRes.isPresent());
         assertEquals(optRes.get().getValue(), "value-2".getBytes());
-        assertEquals(optRes.get().getStat().getVersion(), 1);
+        assertEquals(optRes.get().getStat().getVersion(), putRes.getVersion());
     }
 
     @Test(dataProvider = "impl")
@@ -308,12 +312,14 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         Stat stat = store.put(key1, "value-1".getBytes(), 
Optional.empty()).join();
         assertTrue(store.get(key1).join().isPresent());
         assertEquals(store.getChildren(key1).join(), Collections.emptyList());
-        assertEquals(stat.getVersion(), 0);
+        assertTrue(stat.getVersion() >= 0);
+        assertTrue(stat.isFirstVersion());
 
         Notification n = notifications.poll(3, TimeUnit.SECONDS);
         assertNotNull(n);
         assertEquals(n.getType(), NotificationType.Created);
         assertEquals(n.getPath(), key1);
+        var firstVersion = stat.getVersion();
 
         // Trigger modified notification
         stat = store.put(key1, "value-2".getBytes(), Optional.empty()).join();
@@ -321,7 +327,7 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         assertNotNull(n);
         assertEquals(n.getType(), NotificationType.Modified);
         assertEquals(n.getPath(), key1);
-        assertEquals(stat.getVersion(), 1);
+        assertTrue(stat.getVersion() > firstVersion);
 
         // Trigger modified notification on the parent
         String key1Child = key1 + "/xx";

Reply via email to