This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit af72bd3d14eb47b8348e57403bf244c7d1612026 Author: Jiwei Guo <[email protected]> AuthorDate: Thu Aug 28 10:01:11 2025 +0800 [fix][meta] Use `getChildrenFromStore` to read children data to avoid lost data (#24665) (cherry picked from commit 90a70db6fa81f887f65f60194fef3b36438433cb) --- .../java/org/apache/pulsar/metadata/api/MetadataStore.java | 14 ++++++++++++++ .../bookkeeper/AbstractHierarchicalLedgerManager.java | 8 +++++--- .../bookkeeper/LegacyHierarchicalLedgerRangeIterator.java | 12 ++++++++---- .../bookkeeper/LongHierarchicalLedgerRangeIterator.java | 2 +- .../apache/pulsar/metadata/impl/AbstractMetadataStore.java | 2 -- .../pulsar/metadata/impl/FaultInjectionMetadataStore.java | 10 ++++++++++ .../apache/pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +- .../impl/batching/AbstractBatchedMetadataStore.java | 2 +- .../pulsar/metadata/impl/MetadataStoreFactoryImplTest.java | 2 +- 9 files changed, 41 insertions(+), 13 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index 89b0e7a6fe1..be261f047aa 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -77,6 +77,20 @@ public interface MetadataStore extends AutoCloseable { */ CompletableFuture<List<String>> getChildren(String path); + + /** + * Return all the nodes (lexicographically sorted) that are children to the specific path. + * + * If the path itself does not exist, it will return an empty list. + * + * This method is similar to {@link #getChildren(String)}, but it attempts to read directly from + * the underlying store. + * + * @param path + * the path of the key to get from the store + * @return a future to track the async request + */ + CompletableFuture<List<String>> getChildrenFromStore(String path); /** * Read whether a specific path exists. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java index 4db7f4798c3..bd1c09ddf0f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java @@ -68,7 +68,8 @@ abstract class AbstractHierarchicalLedgerManager { final AsyncCallback.VoidCallback finalCb, final Object context, final int successRc, final int failureRc) { - store.getChildren(path) + store.sync(path) + .thenCompose(__ -> store.getChildrenFromStore(path)) .thenAccept(levelNodes -> { if (levelNodes.isEmpty()) { finalCb.processResult(successRc, null, context); @@ -162,7 +163,7 @@ abstract class AbstractHierarchicalLedgerManager { * Process ledgers in a single zk node. * * <p> - * for each ledger found in this zk node, processor#process(ledgerId) will be triggerred + * for each ledger found in this zk node, processor#process(ledgerId) will be triggered * to process a specific ledger. after all ledgers has been processed, the finalCb will * be called with provided context object. The RC passed to finalCb is decided by : * <ul> @@ -188,7 +189,8 @@ abstract class AbstractHierarchicalLedgerManager { final String path, final BookkeeperInternalCallbacks.Processor<Long> processor, final AsyncCallback.VoidCallback finalCb, final Object ctx, final int successRc, final int failureRc) { - store.getChildren(path) + store.sync(path) + .thenCompose(__ -> store.getChildrenFromStore(path)) .thenAccept(ledgerNodes -> { Set<Long> activeLedgers = HierarchicalLedgerUtils.ledgerListToSet(ledgerNodes, ledgerRootPath, path); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java index 37e6dc836f2..cd9533843cc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java @@ -69,7 +69,7 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg * Iterate next level1 znode. * * @return false if have visited all level1 nodes - * @throws InterruptedException/KeeperException if error occurs reading zookeeper children + * @throws InterruptedException/ExecutionException/TimeoutException if error occurs reading zookeeper children */ private boolean nextL1Node() throws ExecutionException, InterruptedException, TimeoutException { l2NodesIter = null; @@ -83,7 +83,9 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg if (!isLedgerParentNode(curL1Nodes)) { continue; } - List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + curL1Nodes) + String path = ledgersRoot + "/" + curL1Nodes; + List<String> l2Nodes = store.sync(path) + .thenCompose(__ -> store.getChildrenFromStore(path)) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); l2NodesIter = l2Nodes.iterator(); if (!l2NodesIter.hasNext()) { @@ -99,7 +101,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg boolean hasMoreElements = false; try { if (l1NodesIter == null) { - List<String> l1Nodes = store.getChildren(ledgersRoot) + List<String> l1Nodes = store.sync(ledgersRoot) + .thenCompose(__ -> store.getChildrenFromStore(ledgersRoot)) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); l1NodesIter = l1Nodes.iterator(); hasMoreElements = nextL1Node(); @@ -162,7 +165,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg String nodePath = nodeBuilder.toString(); List<String> ledgerNodes = null; try { - ledgerNodes = store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + ledgerNodes = store.sync(nodePath).thenCompose(__ -> store.getChildrenFromStore(nodePath)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (ExecutionException | TimeoutException e) { throw new IOException("Error when get child nodes from zk", e); } catch (InterruptedException e) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java index 3b32916e6e7..92d3dcbe8f2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java @@ -59,7 +59,7 @@ class LongHierarchicalLedgerRangeIterator implements LedgerManager.LedgerRangeIt */ List<String> getChildrenAt(String path) throws IOException { try { - return store.getChildren(path) + return store.sync(path).thenCompose(__ -> store.getChildrenFromStore(path)) .get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); } catch (ExecutionException | TimeoutException e) { if (log.isDebugEnabled()) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 102d3f85b11..5c079cf19dc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -88,8 +88,6 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected MetadataNodeSizeStats nodeSizeStats; - protected abstract CompletableFuture<List<String>> getChildrenFromStore(String path); - protected abstract CompletableFuture<Boolean> existsFromStore(String path); protected AbstractMetadataStore(String metadataStoreName, diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java index 360b8c91d0b..475e3a910a3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java @@ -94,6 +94,16 @@ public class FaultInjectionMetadataStore implements MetadataStoreExtended { return store.getChildren(path); } + @Override + public CompletableFuture<List<String>> getChildrenFromStore(String path) { + Optional<MetadataStoreException> ex = programmedFailure(OperationType.GET_CHILDREN, path); + if (ex.isPresent()) { + return FutureUtil.failedFuture(ex.get()); + } + + return store.getChildrenFromStore(path); + } + @Override public CompletableFuture<Boolean> exists(String path) { Optional<MetadataStoreException> ex = programmedFailure(OperationType.EXISTS, path); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index f900418842e..cf7125b9a97 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -405,7 +405,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore { } @Override - protected CompletableFuture<List<String>> getChildrenFromStore(String path) { + public CompletableFuture<List<String>> getChildrenFromStore(String path) { if (log.isDebugEnabled()) { log.debug("getChildrenFromStore.path={},instanceId={}", path, instanceId); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index 0114534853d..9b214c59f48 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -120,7 +120,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore } @Override - protected final CompletableFuture<List<String>> getChildrenFromStore(String path) { + public final CompletableFuture<List<String>> getChildrenFromStore(String path) { OpGetChildren op = new OpGetChildren(path); enqueue(readOps, op); return op.getFuture(); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java index e0e95c85529..44729649c28 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java @@ -98,7 +98,7 @@ public class MetadataStoreFactoryImplTest { } @Override - protected CompletableFuture<List<String>> getChildrenFromStore(String path) { + public CompletableFuture<List<String>> getChildrenFromStore(String path) { return null; }
