This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d5af4a43c [fix][meta] Fix ephemeral Zookeeper put which creates a 
persistent znode (#23984)
40d5af4a43c is described below

commit 40d5af4a43ca508ef496367925868310ea683780
Author: Heesung Sohn <[email protected]>
AuthorDate: Fri Feb 14 02:52:13 2025 -0800

    [fix][meta] Fix ephemeral Zookeeper put which creates a persistent znode 
(#23984)
---
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  4 +--
 .../pulsar/metadata/MetadataStoreExtendedTest.java | 34 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 4c24aa5938b..8fd82521528 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -439,8 +439,8 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                 
future.completeExceptionally(getException(Code.BADVERSION, opPut.getPath()));
                             } else {
                                 // The z-node does not exist, let's create it 
first
-                                put(opPut.getPath(), opPut.getData(), 
Optional.of(-1L)).thenAccept(
-                                                s -> future.complete(s))
+                                put(opPut.getPath(), opPut.getData(), 
Optional.of(-1L), opPut.getOptions())
+                                        .thenAccept(s -> future.complete(s))
                                         .exceptionally(ex -> {
                                             if (ex.getCause() instanceof 
BadVersionException) {
                                                 // The z-node exist now, let's 
overwrite it
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
index 9a38cdbcd2f..a4c937611fd 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
@@ -66,4 +66,38 @@ public class MetadataStoreExtendedTest extends 
BaseMetadataStoreTest {
         assertNotEquals(seq1, seq2);
         assertTrue(n1 < n2);
     }
+
+    @Test(dataProvider = "impl")
+    public void testPersistentOrEphemeralPut(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        final String key1 = newKey();
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
+        store.put(key1, "value-1".getBytes(), Optional.empty(), 
EnumSet.noneOf(CreateOption.class)).join();
+        var value = store.get(key1).join().get();
+        assertEquals(value.getValue(), "value-1".getBytes());
+        // assertFalse(value.getStat().isEphemeral()); // Todo : fix 
zkStat.getEphemeralOwner() != 0 from test zk
+        assertTrue(value.getStat().isFirstVersion());
+        var version = value.getStat().getVersion();
+
+        store.put(key1, "value-2".getBytes(), Optional.empty(), 
EnumSet.noneOf(CreateOption.class)).join();
+        value = store.get(key1).join().get();
+        assertEquals(value.getValue(), "value-2".getBytes());
+       //assertFalse(value.getStat().isEphemeral());  // Todo : fix 
zkStat.getEphemeralOwner() != 0 from test zk
+        assertEquals(value.getStat().getVersion(), version + 1);
+
+        final String key2 = newKey();
+        store.put(key2, "value-4".getBytes(), Optional.empty(), 
EnumSet.of(CreateOption.Ephemeral)).join();
+        value = store.get(key2).join().get();
+        assertEquals(value.getValue(), "value-4".getBytes());
+        assertTrue(value.getStat().isEphemeral());
+        assertTrue(value.getStat().isFirstVersion());
+        version = value.getStat().getVersion();
+
+
+        store.put(key2, "value-5".getBytes(), Optional.empty(), 
EnumSet.of(CreateOption.Ephemeral)).join();
+        value = store.get(key2).join().get();
+        assertEquals(value.getValue(), "value-5".getBytes());
+        assertTrue(value.getStat().isEphemeral());
+        assertEquals(value.getStat().getVersion(), version + 1);
+    }
+
 }

Reply via email to