This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 7335336cc3e [improve][meta]Allow version to start positive and grow by
more than one (#19503)
7335336cc3e is described below
commit 7335336cc3e11e566bce94668c82788f81183c25
Author: Andras Beni <[email protected]>
AuthorDate: Mon Feb 27 23:09:00 2023 +0100
[improve][meta]Allow version to start positive and grow by more than one
(#19503)
---
.../java/org/apache/pulsar/metadata/api/Stat.java | 12 ++++++++++++
.../metadata/impl/AbstractMetadataStore.java | 2 +-
.../apache/pulsar/metadata/MetadataCacheTest.java | 7 +++++--
.../pulsar/metadata/MetadataStoreBatchingTest.java | 7 +++++--
.../pulsar/metadata/MetadataStoreExtendedTest.java | 6 ++++--
.../apache/pulsar/metadata/MetadataStoreTest.java | 22 ++++++++++++++--------
6 files changed, 41 insertions(+), 15 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
index 5fd6a3b0c6e..b9051df02e3 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
@@ -19,13 +19,20 @@
package org.apache.pulsar.metadata.api;
import lombok.Data;
+import lombok.RequiredArgsConstructor;
/**
* Represent the information associated with a given value in the store.
*/
@Data
+@RequiredArgsConstructor
public class Stat {
+ public Stat(String path, long version, long creationTimestamp, long
modificationTimestamp, boolean ephemeral,
+ boolean createdBySelf) {
+ this(path, version, creationTimestamp, modificationTimestamp,
ephemeral, createdBySelf, version == 0);
+ }
+
/**
* The path of the value.
*/
@@ -55,4 +62,9 @@ public class Stat {
* Whether the key-value pair had been created within the current
"session".
*/
final boolean createdBySelf;
+
+ /**
+ * Whether this is the first version of the key-value pair since it has
been last created.
+ */
+ final boolean firstVersion;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 4b9ff914fcf..f74c624a10d 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -387,7 +387,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
return storePut(path, data, optExpectedVersion,
(options != null && !options.isEmpty()) ?
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class))
.thenApply(stat -> {
- NotificationType type = stat.getVersion() == 0 ?
NotificationType.Created
+ NotificationType type = stat.isFirstVersion() ?
NotificationType.Created
: NotificationType.Modified;
if (type == NotificationType.Created) {
existsCache.synchronous().invalidate(path);
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index 43af3ad757e..e09645ce79f 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -289,7 +289,9 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
assertEquals(objCache.get(key1).join(), Optional.empty());
MyClass value1 = new MyClass("a", 1);
- store.put(key1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1),
Optional.of(-1L)).join();
+ Stat putResult = store.put(key1,
ObjectMapperFactory.getThreadLocal().writer().writeValueAsBytes(value1),
+ Optional.of(-1L)).join();
+ assertTrue(putResult.isFirstVersion());
Awaitility.await().untilAsserted(() -> {
assertEquals(objCache.getIfCached(key1), Optional.of(value1));
@@ -297,7 +299,8 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
});
MyClass value2 = new MyClass("a", 2);
- store.put(key1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value2),
Optional.of(0L)).join();
+ store.put(key1,
ObjectMapperFactory.getThreadLocal().writer().writeValueAsBytes(value2),
+ Optional.of(putResult.getVersion())).join();
Awaitility.await().untilAsserted(() -> {
assertEquals(objCache.getIfCached(key1), Optional.of(value2));
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
index 8f159e6a91e..0f59ca7a250 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
@@ -104,7 +104,8 @@ public class MetadataStoreBatchingTest extends
BaseMetadataStoreTest {
CompletableFuture<Stat> f3 = store.put(key1 + "/c", new byte[0],
Optional.of(-1L)); // Should succeed
CompletableFuture<Void> f4 = store.delete(key1 + "/d",
Optional.empty()); // Should fail
- assertEquals(f1.join().getVersion(), 0L);
+ assertTrue(f1.join().getVersion() >= 0L);
+ assertTrue(f1.join().isFirstVersion());
try {
f2.join();
@@ -112,7 +113,9 @@ public class MetadataStoreBatchingTest extends
BaseMetadataStoreTest {
assertEquals(ce.getCause().getClass(), BadVersionException.class);
}
- assertEquals(f3.join().getVersion(), 0L);
+ assertTrue(f3.join().getVersion() >= 0L);
+ assertTrue(f3.join().isFirstVersion());
+
try {
f4.join();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
index 14cd5947b18..1de29916c1e 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
@@ -45,7 +45,8 @@ public class MetadataStoreExtendedTest extends
BaseMetadataStoreTest {
Stat stat1 = store.put(basePath, "value-1".getBytes(),
Optional.of(-1L), EnumSet.of(CreateOption.Sequential))
.join();
assertNotNull(stat1);
- assertEquals(stat1.getVersion(), 0L);
+ assertTrue(stat1.getVersion() >= 0L);
+ assertTrue(stat1.isFirstVersion());
assertNotEquals(stat1.getPath(), basePath);
assertEquals(store.get(stat1.getPath()).join().get().getValue(),
"value-1".getBytes());
String seq1 = stat1.getPath().replace(basePath, "");
@@ -54,7 +55,8 @@ public class MetadataStoreExtendedTest extends
BaseMetadataStoreTest {
Stat stat2 = store.put(basePath, "value-2".getBytes(),
Optional.of(-1L), EnumSet.of(CreateOption.Sequential))
.join();
assertNotNull(stat2);
- assertEquals(stat2.getVersion(), 0L);
+ assertTrue(stat2.getVersion() >= 0L);
+ assertTrue(stat2.isFirstVersion());
assertNotEquals(stat2.getPath(), basePath);
assertNotEquals(stat2.getPath(), stat1.getPath());
assertEquals(store.get(stat2.getPath()).join().get().getValue(),
"value-2".getBytes());
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 9e0c6887d94..109f0bd05ec 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -127,13 +127,16 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
assertException(e, BadVersionException.class);
}
- store.put(key1, "value-1".getBytes(), Optional.of(-1L)).join();
+ var putRes = store.put(key1, "value-1".getBytes(),
Optional.of(-1L)).join();
+ long putVersion = putRes.getVersion();
+ assertTrue(putVersion >= 0);
+ assertTrue(putRes.isFirstVersion());
assertTrue(store.exists(key1).join());
Optional<GetResult> optRes = store.get(key1).join();
assertTrue(optRes.isPresent());
assertEquals(optRes.get().getValue(), "value-1".getBytes());
- assertEquals(optRes.get().getStat().getVersion(), 0);
+ assertEquals(optRes.get().getStat().getVersion(), putVersion);
try {
store.put(key1, "value-2".getBytes(), Optional.of(-1L)).join();
@@ -143,7 +146,7 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
}
try {
- store.put(key1, "value-2".getBytes(), Optional.of(1L)).join();
+ store.put(key1, "value-2".getBytes(), Optional.of(putVersion +
1)).join();
fail("Should have failed");
} catch (CompletionException e) {
assertException(e, BadVersionException.class);
@@ -153,15 +156,16 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
optRes = store.get(key1).join();
assertTrue(optRes.isPresent());
assertEquals(optRes.get().getValue(), "value-1".getBytes());
- assertEquals(optRes.get().getStat().getVersion(), 0);
+ assertEquals(optRes.get().getStat().getVersion(), putVersion);
- store.put(key1, "value-2".getBytes(), Optional.of(0L)).join();
+ putRes = store.put(key1, "value-2".getBytes(),
Optional.of(putVersion)).join();
+ assertTrue(putRes.getVersion() > putVersion);
assertTrue(store.exists(key1).join());
optRes = store.get(key1).join();
assertTrue(optRes.isPresent());
assertEquals(optRes.get().getValue(), "value-2".getBytes());
- assertEquals(optRes.get().getStat().getVersion(), 1);
+ assertEquals(optRes.get().getStat().getVersion(), putRes.getVersion());
}
@Test(dataProvider = "impl")
@@ -308,12 +312,14 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
Stat stat = store.put(key1, "value-1".getBytes(),
Optional.empty()).join();
assertTrue(store.get(key1).join().isPresent());
assertEquals(store.getChildren(key1).join(), Collections.emptyList());
- assertEquals(stat.getVersion(), 0);
+ assertTrue(stat.getVersion() >= 0);
+ assertTrue(stat.isFirstVersion());
Notification n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Created);
assertEquals(n.getPath(), key1);
+ var firstVersion = stat.getVersion();
// Trigger modified notification
stat = store.put(key1, "value-2".getBytes(), Optional.empty()).join();
@@ -321,7 +327,7 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Modified);
assertEquals(n.getPath(), key1);
- assertEquals(stat.getVersion(), 1);
+ assertTrue(stat.getVersion() > firstVersion);
// Trigger modified notification on the parent
String key1Child = key1 + "/xx";