This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch cp-17955-dev-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2266118d9d0fc10409fd3dd48ba423a685cd7567 Author: Caideyipi <[email protected]> AuthorDate: Thu Jun 18 08:50:02 2026 +0800 Fix PBTree flush for negative child address (#17955) * Fix PBTree flush for negative child address * Address PBTree flush review comments (cherry picked from commit 8d56ae71e5fda188854bf78463d4d4b45488d2ed) --- .../mtree/impl/pbtree/flush/Scheduler.java | 69 +++++++++++++--------- .../pbtree/schemafile/pagemgr/PageManager.java | 9 ++- .../metadata/mtree/schemafile/SchemaFileTest.java | 33 +++++++++++ 3 files changed, 80 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java index b467b473347..9ef0fbf0f30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java @@ -76,7 +76,8 @@ public class Scheduler { this.releaseFlushStrategy = releaseFlushStrategy; } - private void executeFlush(CachedMTreeStore store, int regionId, AtomicInteger remainToFlush) { + private void executeFlush( + CachedMTreeStore store, int regionId, AtomicInteger remainToFlush, boolean propagateFailure) { IMemoryManager memoryManager = store.getMemoryManager(); ISchemaFile file = store.getSchemaFile(); LockManager lockManager = store.getLockManager(); @@ -97,6 +98,9 @@ public class Scheduler { regionId, e.getMessage(), e); + if (propagateFailure) { + throw new RuntimeException(e); + } } finally { long time = System.currentTimeMillis() - startTime; if (time > 10_000) { @@ -145,22 +149,26 @@ public class Scheduler { CompletableFuture.runAsync( () -> { int regionId = entry.getKey(); - CachedMTreeStore store = entry.getValue(); - if (store == null) { - // store has been closed - return; - } - LockManager lockManager = store.getLockManager(); - lockManager.globalReadLock(); - if (!regionToStore.containsKey(regionId)) { - // double check store have not been closed - return; - } try { - executeFlush(store, regionId, null); - executeRelease(store, false); + CachedMTreeStore store = entry.getValue(); + if (store == null) { + // store has been closed + return; + } + LockManager lockManager = store.getLockManager(); + lockManager.globalReadLock(); + try { + if (!regionToStore.containsKey(regionId)) { + // double check store have not been closed + return; + } + executeFlush(store, regionId, null, true); + executeRelease(store, false); + } finally { + lockManager.globalReadUnlock(); + } } finally { - lockManager.globalReadUnlock(); + flushingRegionSet.remove(regionId); } }, workerPool)) @@ -221,22 +229,25 @@ public class Scheduler { flushingRegionSet.add(regionId); workerPool.submit( () -> { - CachedMTreeStore store = regionToStore.get(regionId); - if (store == null) { - // store has been closed - return; - } - LockManager lockManager = store.getLockManager(); - lockManager.globalReadLock(); - if (!regionToStore.containsKey(regionId)) { - // double check store have not been closed - return; - } try { - - executeFlush(store, regionId, remainToFlush); + CachedMTreeStore store = regionToStore.get(regionId); + if (store == null) { + // store has been closed + return; + } + LockManager lockManager = store.getLockManager(); + lockManager.globalReadLock(); + try { + if (!regionToStore.containsKey(regionId)) { + // double check store have not been closed + return; + } + executeFlush(store, regionId, remainToFlush, false); + } finally { + lockManager.globalReadUnlock(); + } } finally { - lockManager.globalReadUnlock(); + flushingRegionSet.remove(regionId); } }); if (remainToFlush.get() <= 0) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java index 06d8c279844..56e0b5a6dc2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java @@ -292,8 +292,6 @@ public abstract class PageManager implements IPageManager { .entrySet()) { child = entry.getValue(); actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt); - childBuffer = RecordUtils.node2Buffer(child); - curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt); if (curPage.getAsSegmentedPage().read(SchemaFile.getSegIndex(actualAddress), entry.getKey()) == null) { @@ -302,6 +300,13 @@ public abstract class PageManager implements IPageManager { "Node[%s] has no child[%s] in pbtree file.", node.getName(), entry.getKey())); } + if (!child.isMeasurement() && getNodeAddress(child) < 0) { + short estSegSize = estimateSegmentSize(child); + long glbIndex = preAllocateSegment(estSegSize, cxt); + SchemaFile.setNodeAddress(child, glbIndex); + } + childBuffer = RecordUtils.node2Buffer(child); + // prepare alias comparison if (child.isMeasurement() && child.getAsMeasurementMNode().getAlias() != null) { alias = child.getAsMeasurementMNode().getAlias(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java index 75a8ab8bc00..bb25ea10092 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java @@ -315,6 +315,39 @@ public class SchemaFileTest { sf.close(); } + @Test + public void testFlushUpdatedChildWithNegativeSegmentAddress() throws Exception { + ISchemaFile sf = SchemaFile.initSchemaFile("root.sg", TEST_SCHEMA_REGION_ID); + ICachedMNode sgNode = nodeFactory.createDatabaseDeviceMNode(null, "sg").getAsMNode(); + ICachedMNode device = nodeFactory.createDeviceMNode(sgNode, "d1").getAsMNode(); + sgNode.addChild(device); + + writeMNodeInTest(sf, sgNode); + + // Typical flush order: the parent already has this child record on disk, while the updated + // child object in memory may still have no valid segment address. + ICachedMNodeContainer.getCachedMNodeContainer(device).setSegmentAddress(-1L); + addNodeToUpdateBuffer(sgNode, device); + writeMNodeInTest(sf, sgNode); + + Assert.assertTrue(getSegAddrInContainer(device) >= 0); + + device.addChild(getMeasurementNode(device, "s1", "alias_s1")); + writeMNodeInTest(sf, device); + Assert.assertEquals( + "alias_s1", sf.getChildNode(device, "s1").getAsMeasurementMNode().getAlias()); + + long deviceSegmentAddress = getSegAddrInContainer(device); + sf.close(); + + sf = SchemaFile.loadSchemaFile("root.sg", TEST_SCHEMA_REGION_ID); + ICachedMNode loadedDevice = sf.getChildNode(sgNode, "d1"); + Assert.assertEquals(deviceSegmentAddress, getSegAddrInContainer(loadedDevice)); + Assert.assertEquals( + "alias_s1", sf.getChildNode(loadedDevice, "s1").getAsMeasurementMNode().getAlias()); + sf.close(); + } + @Test public void testMassiveSegment() throws MetadataException, IOException { ICachedMNode dbNode = nodeFactory.createDatabaseDeviceMNode(null, "sgRoot");
