This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af72bd3d14eb47b8348e57403bf244c7d1612026
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Aug 28 10:01:11 2025 +0800

    [fix][meta] Use `getChildrenFromStore` to read children data to avoid lost 
data (#24665)
    
    (cherry picked from commit 90a70db6fa81f887f65f60194fef3b36438433cb)
---
 .../java/org/apache/pulsar/metadata/api/MetadataStore.java | 14 ++++++++++++++
 .../bookkeeper/AbstractHierarchicalLedgerManager.java      |  8 +++++---
 .../bookkeeper/LegacyHierarchicalLedgerRangeIterator.java  | 12 ++++++++----
 .../bookkeeper/LongHierarchicalLedgerRangeIterator.java    |  2 +-
 .../apache/pulsar/metadata/impl/AbstractMetadataStore.java |  2 --
 .../pulsar/metadata/impl/FaultInjectionMetadataStore.java  | 10 ++++++++++
 .../apache/pulsar/metadata/impl/RocksdbMetadataStore.java  |  2 +-
 .../impl/batching/AbstractBatchedMetadataStore.java        |  2 +-
 .../pulsar/metadata/impl/MetadataStoreFactoryImplTest.java |  2 +-
 9 files changed, 41 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 89b0e7a6fe1..be261f047aa 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -77,6 +77,20 @@ public interface MetadataStore extends AutoCloseable {
      */
     CompletableFuture<List<String>> getChildren(String path);
 
+
+    /**
+     * Return all the nodes (lexicographically sorted) that are children to 
the specific path.
+     *
+     * If the path itself does not exist, it will return an empty list.
+     *
+     * This method is similar to {@link #getChildren(String)}, but it attempts 
to read directly from
+     *  the underlying store.
+     *
+     * @param path
+     *            the path of the key to get from the store
+     * @return a future to track the async request
+     */
+    CompletableFuture<List<String>> getChildrenFromStore(String path);
     /**
      * Read whether a specific path exists.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
index 4db7f4798c3..bd1c09ddf0f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
@@ -68,7 +68,8 @@ abstract class AbstractHierarchicalLedgerManager {
             final AsyncCallback.VoidCallback finalCb, final Object context,
             final int successRc, final int failureRc) {
 
-        store.getChildren(path)
+        store.sync(path)
+                .thenCompose(__ -> store.getChildrenFromStore(path))
                 .thenAccept(levelNodes -> {
                     if (levelNodes.isEmpty()) {
                         finalCb.processResult(successRc, null, context);
@@ -162,7 +163,7 @@ abstract class AbstractHierarchicalLedgerManager {
      * Process ledgers in a single zk node.
      *
      * <p>
-     * for each ledger found in this zk node, processor#process(ledgerId) will 
be triggerred
+     * for each ledger found in this zk node, processor#process(ledgerId) will 
be triggered
      * to process a specific ledger. after all ledgers has been processed, the 
finalCb will
      * be called with provided context object. The RC passed to finalCb is 
decided by :
      * <ul>
@@ -188,7 +189,8 @@ abstract class AbstractHierarchicalLedgerManager {
             final String path, final 
BookkeeperInternalCallbacks.Processor<Long> processor,
             final AsyncCallback.VoidCallback finalCb, final Object ctx,
             final int successRc, final int failureRc) {
-        store.getChildren(path)
+        store.sync(path)
+                .thenCompose(__ -> store.getChildrenFromStore(path))
                 .thenAccept(ledgerNodes -> {
                     Set<Long> activeLedgers = 
HierarchicalLedgerUtils.ledgerListToSet(ledgerNodes,
                             ledgerRootPath, path);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
index 37e6dc836f2..cd9533843cc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
@@ -69,7 +69,7 @@ public class LegacyHierarchicalLedgerRangeIterator implements 
LedgerManager.Ledg
      * Iterate next level1 znode.
      *
      * @return false if have visited all level1 nodes
-     * @throws InterruptedException/KeeperException if error occurs reading 
zookeeper children
+     * @throws InterruptedException/ExecutionException/TimeoutException if 
error occurs reading zookeeper children
      */
     private boolean nextL1Node() throws ExecutionException, 
InterruptedException, TimeoutException {
         l2NodesIter = null;
@@ -83,7 +83,9 @@ public class LegacyHierarchicalLedgerRangeIterator implements 
LedgerManager.Ledg
             if (!isLedgerParentNode(curL1Nodes)) {
                 continue;
             }
-            List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + 
curL1Nodes)
+            String path = ledgersRoot + "/" + curL1Nodes;
+            List<String> l2Nodes = store.sync(path)
+                    .thenCompose(__ -> store.getChildrenFromStore(path))
                     .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             l2NodesIter = l2Nodes.iterator();
             if (!l2NodesIter.hasNext()) {
@@ -99,7 +101,8 @@ public class LegacyHierarchicalLedgerRangeIterator 
implements LedgerManager.Ledg
             boolean hasMoreElements = false;
             try {
                 if (l1NodesIter == null) {
-                    List<String> l1Nodes = store.getChildren(ledgersRoot)
+                    List<String> l1Nodes = store.sync(ledgersRoot)
+                            .thenCompose(__ -> 
store.getChildrenFromStore(ledgersRoot))
                             .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
                     l1NodesIter = l1Nodes.iterator();
                     hasMoreElements = nextL1Node();
@@ -162,7 +165,8 @@ public class LegacyHierarchicalLedgerRangeIterator 
implements LedgerManager.Ledg
         String nodePath = nodeBuilder.toString();
         List<String> ledgerNodes = null;
         try {
-            ledgerNodes = 
store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+            ledgerNodes = store.sync(nodePath).thenCompose(__ -> 
store.getChildrenFromStore(nodePath))
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         } catch (ExecutionException | TimeoutException e) {
             throw new IOException("Error when get child nodes from zk", e);
         } catch (InterruptedException e) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
index 3b32916e6e7..92d3dcbe8f2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
@@ -59,7 +59,7 @@ class LongHierarchicalLedgerRangeIterator implements 
LedgerManager.LedgerRangeIt
      */
     List<String> getChildrenAt(String path) throws IOException {
         try {
-            return store.getChildren(path)
+            return store.sync(path).thenCompose(__ -> 
store.getChildrenFromStore(path))
                     .get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException | TimeoutException e) {
             if (log.isDebugEnabled()) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 102d3f85b11..5c079cf19dc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -88,8 +88,6 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected MetadataNodeSizeStats nodeSizeStats;
 
-    protected abstract CompletableFuture<List<String>> 
getChildrenFromStore(String path);
-
     protected abstract CompletableFuture<Boolean> existsFromStore(String path);
 
     protected AbstractMetadataStore(String metadataStoreName,
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 360b8c91d0b..475e3a910a3 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -94,6 +94,16 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
         return store.getChildren(path);
     }
 
+    @Override
+    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+        Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.GET_CHILDREN, path);
+        if (ex.isPresent()) {
+            return FutureUtil.failedFuture(ex.get());
+        }
+
+        return store.getChildrenFromStore(path);
+    }
+
     @Override
     public CompletableFuture<Boolean> exists(String path) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.EXISTS, path);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index f900418842e..cf7125b9a97 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -405,7 +405,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    protected CompletableFuture<List<String>> getChildrenFromStore(String 
path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
         if (log.isDebugEnabled()) {
             log.debug("getChildrenFromStore.path={},instanceId={}", path, 
instanceId);
         }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 0114534853d..9b214c59f48 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -120,7 +120,7 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     }
 
     @Override
-    protected final CompletableFuture<List<String>> 
getChildrenFromStore(String path) {
+    public final CompletableFuture<List<String>> getChildrenFromStore(String 
path) {
         OpGetChildren op = new OpGetChildren(path);
         enqueue(readOps, op);
         return op.getFuture();
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index e0e95c85529..44729649c28 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -98,7 +98,7 @@ public class MetadataStoreFactoryImplTest {
         }
 
         @Override
-        protected CompletableFuture<List<String>> getChildrenFromStore(String 
path) {
+        public CompletableFuture<List<String>> getChildrenFromStore(String 
path) {
             return null;
         }
 

Reply via email to