This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch add_flush_interface in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 060f28e7c01a66528da6dc2d6de179c0c97ea237 Author: Tian Jiang <[email protected]> AuthorDate: Fri Feb 13 11:15:06 2026 +0800 Add an interface to support flushing by TsFileResource --- .../db/storageengine/dataregion/DataRegion.java | 44 +++++----- .../dataregion/memtable/TsFileProcessor.java | 28 +++++-- .../storageengine/dataregion/DataRegionTest.java | 93 +++++++++++++--------- 3 files changed, 102 insertions(+), 63 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index d95d51e390e..ebd6f2fcb4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1939,6 +1939,15 @@ public class DataRegion implements IDataRegionForQuery { return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, unseqCompactionCnt); } + public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) { + writeLock("asyncCloseOneTsFileProcessor"); + try { + return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), tsFileResource.getProcessor()); + } finally { + writeUnlock(); + } + } + /** * close one tsfile processor, thread-safety should be ensured by caller * @@ -1946,31 +1955,26 @@ public class DataRegion implements IDataRegionForQuery { * @param tsFileProcessor tsfile processor */ public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) { - // for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed. - // for unsequence tsfile, we have maintained the endTimeMap when an insertion comes. - if (closingSequenceTsFileProcessor.contains(tsFileProcessor) - || closingUnSequenceTsFileProcessor.contains(tsFileProcessor) - || tsFileProcessor.alreadyMarkedClosing()) { + if (tsFileProcessor == null) { return CompletableFuture.completedFuture(null); } - Future<?> future; - if (sequence) { - closingSequenceTsFileProcessor.add(tsFileProcessor); - future = tsFileProcessor.asyncClose(); - if (future.isDone()) { - closingSequenceTsFileProcessor.remove(tsFileProcessor); - } + if (tsFileProcessor.getCloseFuture() != null) { + return tsFileProcessor.getCloseFuture(); + } - workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); - } else { - closingUnSequenceTsFileProcessor.add(tsFileProcessor); - future = tsFileProcessor.asyncClose(); - if (future.isDone()) { - closingUnSequenceTsFileProcessor.remove(tsFileProcessor); - } + Future<?> future; + Set<TsFileProcessor> closingTsFileProcessors = + sequence ? closingSequenceTsFileProcessor : closingUnSequenceTsFileProcessor; + TreeMap<Long, TsFileProcessor> workTsFileProcessors = + sequence ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors; - workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); + closingTsFileProcessors.add(tsFileProcessor); + future = tsFileProcessor.asyncClose(); + if (future.isDone()) { + closingTsFileProcessors.remove(tsFileProcessor); } + workTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); + TsFileResource resource = tsFileProcessor.getTsFileResource(); logger.info( "Async close tsfile: {}, file start time: {}, file end time: {}", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index acdac61180b..e36c2fde77c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -210,6 +210,8 @@ public class TsFileProcessor { private int walEntryNum = 0; + private Future<?> closeFuture; + @SuppressWarnings("squid:S107") public TsFileProcessor( String dataRegionName, @@ -1250,10 +1252,13 @@ public class TsFileProcessor { logger.info( "Sync close file: {}, will firstly async close it", tsFileResource.getTsFile().getAbsolutePath()); - if (shouldClose) { - return; - } + try { + if (closeFuture != null) { + closeFuture.get(); + return; + } + asyncClose().get(); logger.info("Start to wait until file {} is closed", tsFileResource); // if this TsFileProcessor is closing, asyncClose().get() of this thread will return quickly, @@ -1273,6 +1278,10 @@ public class TsFileProcessor { flushQueryLock.writeLock().lock(); logFlushQueryWriteLocked(); try { + if (closeFuture != null) { + return closeFuture; + } + if (logger.isDebugEnabled()) { if (workMemTable != null) { logger.debug( @@ -1293,10 +1302,6 @@ public class TsFileProcessor { tsFileResource.getTsFileSize()); } } - - if (shouldClose) { - return CompletableFuture.completedFuture(null); - } // when a flush thread serves this TsFileProcessor (because the processor is submitted by // registerTsFileProcessor()), the thread will seal the corresponding TsFile and // execute other cleanup works if "shouldClose == true and flushingMemTables is empty". @@ -1315,6 +1320,7 @@ public class TsFileProcessor { // flushing memTable in System module. Future<?> future = addAMemtableIntoFlushingList(tmpMemTable); shouldClose = true; + closeFuture = future; return future; } catch (Exception e) { logger.error( @@ -2384,4 +2390,12 @@ public class TsFileProcessor { public String toString() { return "TsFileProcessor{" + "tsFileResource=" + tsFileResource + '}'; } + + public Future<?> getCloseFuture() { + return closeFuture; + } + + public void setCloseFuture(Future<?> closeFuture) { + this.closeFuture = closeFuture; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index c3895a058c6..0cb7143e708 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -93,9 +93,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import static org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode; import static org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode; +import static org.junit.Assert.assertTrue; public class DataRegionTest { private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -266,7 +270,7 @@ public class DataRegionTest { null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -314,7 +318,7 @@ public class DataRegionTest { Assert.assertEquals(1, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -362,7 +366,7 @@ public class DataRegionTest { Assert.assertEquals(1, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -451,7 +455,7 @@ public class DataRegionTest { Assert.assertEquals(2, queryDataSource.getSeqResources().size()); Assert.assertEquals(1, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -518,7 +522,7 @@ public class DataRegionTest { times.length); dataRegion.insertTablet(insertTabletNode2); - Assert.assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0); + assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0); dataRegion.syncDeleteDataFiles(); Assert.assertEquals(0, SystemInfo.getInstance().getTotalMemTableSize()); @@ -603,7 +607,7 @@ public class DataRegionTest { Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -679,7 +683,7 @@ public class DataRegionTest { Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -706,10 +710,10 @@ public class DataRegionTest { Assert.assertEquals(10, queryDataSource.getSeqResources().size()); Assert.assertEquals(10, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -740,10 +744,10 @@ public class DataRegionTest { Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -773,10 +777,10 @@ public class DataRegionTest { Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(20, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } config.setEnableSeparateData(defaultValue); @@ -855,7 +859,7 @@ public class DataRegionTest { Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } config.setEnableSeparateData(defaultEnableDiscard); @@ -935,7 +939,7 @@ public class DataRegionTest { Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } config.setEnableSeparateData(defaultEnableDiscard); @@ -1015,7 +1019,7 @@ public class DataRegionTest { Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } config.setEnableSeparateData(defaultEnableDiscard); @@ -1055,7 +1059,7 @@ public class DataRegionTest { Assert.assertEquals(1, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } dataRegion1.syncDeleteDataFiles(); } @@ -1092,10 +1096,10 @@ public class DataRegionTest { Assert.assertEquals(10, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } dataRegion1.syncDeleteDataFiles(); @@ -1160,10 +1164,10 @@ public class DataRegionTest { Collections.singletonList(nonAlignedFullPath), device, context, null, null); Assert.assertEquals(2, queryDataSource.getSeqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } IoTDBDescriptor.getInstance() .getConfig() @@ -1232,7 +1236,7 @@ public class DataRegionTest { + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX); Assert.assertFalse(logFile.exists()); Assert.assertFalse(CommonDescriptor.getInstance().getConfig().isReadOnly()); - Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction()); + assertTrue(dataRegion.getTsFileManager().isAllowCompaction()); } finally { new CompactionConfigRestorer().restoreCompactionConfig(); } @@ -1392,10 +1396,10 @@ public class DataRegionTest { for (int i = 0; i < dataRegion.getSequenceFileList().size(); i++) { TsFileResource resource = dataRegion.getSequenceFileList().get(i); if (i == 1) { - Assert.assertTrue(resource.anyModFileExists()); + assertTrue(resource.anyModFileExists()); Assert.assertEquals(2, resource.getAllModEntries().size()); } else if (i == 3) { - Assert.assertTrue(resource.anyModFileExists()); + assertTrue(resource.anyModFileExists()); Assert.assertEquals(1, resource.getAllModEntries().size()); } else { Assert.assertFalse(resource.anyModFileExists()); @@ -1489,7 +1493,7 @@ public class DataRegionTest { dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"), deleteDataNode4); dataRegion.syncCloseAllWorkingTsFileProcessors(); - Assert.assertTrue(tsFileResource.anyModFileExists()); + assertTrue(tsFileResource.anyModFileExists()); Assert.assertEquals(3, tsFileResource.getAllModEntries().size()); } @@ -1584,7 +1588,7 @@ public class DataRegionTest { dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"), deleteDataNode12); dataRegion.syncCloseAllWorkingTsFileProcessors(); - Assert.assertTrue(tsFileResource.anyModFileExists()); + assertTrue(tsFileResource.anyModFileExists()); Assert.assertEquals(3, tsFileResource.getAllModEntries().size()); } @@ -1686,7 +1690,7 @@ public class DataRegionTest { new DeleteDataNode(new PlanNodeId("1"), Collections.singletonList(path), 50, 100); deleteDataNode1.setSearchIndex(0); dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), deleteDataNode1); - Assert.assertTrue(tsFileResource.getTsFile().exists()); + assertTrue(tsFileResource.getTsFile().exists()); Assert.assertFalse(tsFileResource.anyModFileExists()); dataRegion.syncCloseAllWorkingTsFileProcessors(); @@ -1696,8 +1700,8 @@ public class DataRegionTest { new DeleteDataNode(new PlanNodeId("2"), Collections.singletonList(path), 100, 120); deleteDataNode2.setSearchIndex(0); dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), deleteDataNode2); - Assert.assertTrue(tsFileResource.getTsFile().exists()); - Assert.assertTrue(tsFileResource.anyModFileExists()); + assertTrue(tsFileResource.getTsFile().exists()); + assertTrue(tsFileResource.anyModFileExists()); // delete data in closed file, and time all match DeleteDataNode deleteDataNode3 = @@ -1727,8 +1731,8 @@ public class DataRegionTest { dataRegion.syncCloseWorkingTsFileProcessors(true); TsFileResource tsFileResourceUnSeq = dataRegion.getTsFileManager().getTsFileList(false).get(0); - Assert.assertTrue(tsFileResourceSeq.getTsFile().exists()); - Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); + assertTrue(tsFileResourceSeq.getTsFile().exists()); + assertTrue(tsFileResourceUnSeq.getTsFile().exists()); // already closed, will have a mods file. MeasurementPath path = new MeasurementPath("root.vehicle.d0.**"); @@ -1743,9 +1747,9 @@ public class DataRegionTest { dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), deleteDataNode2); // delete data in mem table, there is no mods - Assert.assertTrue(tsFileResourceSeq.getTsFile().exists()); - Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); - Assert.assertTrue(tsFileResourceSeq.anyModFileExists()); + assertTrue(tsFileResourceSeq.getTsFile().exists()); + assertTrue(tsFileResourceUnSeq.getTsFile().exists()); + assertTrue(tsFileResourceSeq.anyModFileExists()); Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists()); dataRegion.syncCloseAllWorkingTsFileProcessors(); @@ -1753,8 +1757,8 @@ public class DataRegionTest { new DeleteDataNode(new PlanNodeId("3"), Collections.singletonList(path), 40, 80); deleteDataNode3.setSearchIndex(0); dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), deleteDataNode3); - Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); - Assert.assertTrue(tsFileResourceUnSeq.anyModFileExists()); + assertTrue(tsFileResourceUnSeq.getTsFile().exists()); + assertTrue(tsFileResourceUnSeq.anyModFileExists()); // seq file and unseq file have data file and mod file now, // this deletion will remove data file and mod file. @@ -1772,4 +1776,21 @@ public class DataRegionTest { Assert.assertFalse(tsFileResourceSeq.anyModFileExists()); Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists()); } + + @Test + public void testFlushSpecifiedResource() + throws IllegalPathException, WriteProcessException, ExecutionException, InterruptedException { + for (int j = 100; j < 200; j++) { + TSRecord record = new TSRecord(deviceId, j); + record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); + dataRegion.insert(buildInsertRowNodeByTSRecord(record)); + } + TsFileResource tsFileResourceSeq = dataRegion.getTsFileManager().getTsFileList(true).get(0); + Future<?> future = dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq); + Future<?> future2 = dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq); + assertTrue(future == future2 || future2 instanceof CompletableFuture); + + future.get(); + assertTrue(tsFileResourceSeq.isClosed()); + } }
