This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch add_flush_interface
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 060f28e7c01a66528da6dc2d6de179c0c97ea237
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Feb 13 11:15:06 2026 +0800

    Add an interface to support flushing by TsFileResource
---
 .../db/storageengine/dataregion/DataRegion.java    | 44 +++++-----
 .../dataregion/memtable/TsFileProcessor.java       | 28 +++++--
 .../storageengine/dataregion/DataRegionTest.java   | 93 +++++++++++++---------
 3 files changed, 102 insertions(+), 63 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d95d51e390e..ebd6f2fcb4d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1939,6 +1939,15 @@ public class DataRegion implements IDataRegionForQuery {
     return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, 
unseqCompactionCnt);
   }
 
+  public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) 
{
+    writeLock("asyncCloseOneTsFileProcessor");
+    try {
+      return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), 
tsFileResource.getProcessor());
+    } finally {
+      writeUnlock();
+    }
+  }
+
   /**
    * close one tsfile processor, thread-safety should be ensured by caller
    *
@@ -1946,31 +1955,26 @@ public class DataRegion implements IDataRegionForQuery {
    * @param tsFileProcessor tsfile processor
    */
   public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, 
TsFileProcessor tsFileProcessor) {
-    // for sequence tsfile, we update the endTimeMap only when the file is 
prepared to be closed.
-    // for unsequence tsfile, we have maintained the endTimeMap when an 
insertion comes.
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
-        || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
-        || tsFileProcessor.alreadyMarkedClosing()) {
+    if (tsFileProcessor == null) {
       return CompletableFuture.completedFuture(null);
     }
-    Future<?> future;
-    if (sequence) {
-      closingSequenceTsFileProcessor.add(tsFileProcessor);
-      future = tsFileProcessor.asyncClose();
-      if (future.isDone()) {
-        closingSequenceTsFileProcessor.remove(tsFileProcessor);
-      }
+    if (tsFileProcessor.getCloseFuture() != null) {
+      return tsFileProcessor.getCloseFuture();
+    }
 
-      workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
-    } else {
-      closingUnSequenceTsFileProcessor.add(tsFileProcessor);
-      future = tsFileProcessor.asyncClose();
-      if (future.isDone()) {
-        closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
-      }
+    Future<?> future;
+    Set<TsFileProcessor> closingTsFileProcessors =
+        sequence ? closingSequenceTsFileProcessor : 
closingUnSequenceTsFileProcessor;
+    TreeMap<Long, TsFileProcessor> workTsFileProcessors =
+        sequence ? workSequenceTsFileProcessors : 
workUnsequenceTsFileProcessors;
 
-      workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
+    closingTsFileProcessors.add(tsFileProcessor);
+    future = tsFileProcessor.asyncClose();
+    if (future.isDone()) {
+      closingTsFileProcessors.remove(tsFileProcessor);
     }
+    workTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
+
     TsFileResource resource = tsFileProcessor.getTsFileResource();
     logger.info(
         "Async close tsfile: {}, file start time: {}, file end time: {}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index acdac61180b..e36c2fde77c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -210,6 +210,8 @@ public class TsFileProcessor {
 
   private int walEntryNum = 0;
 
+  private Future<?> closeFuture;
+
   @SuppressWarnings("squid:S107")
   public TsFileProcessor(
       String dataRegionName,
@@ -1250,10 +1252,13 @@ public class TsFileProcessor {
     logger.info(
         "Sync close file: {}, will firstly async close it",
         tsFileResource.getTsFile().getAbsolutePath());
-    if (shouldClose) {
-      return;
-    }
+
     try {
+      if (closeFuture != null) {
+        closeFuture.get();
+        return;
+      }
+
       asyncClose().get();
       logger.info("Start to wait until file {} is closed", tsFileResource);
       // if this TsFileProcessor is closing, asyncClose().get() of this thread 
will return quickly,
@@ -1273,6 +1278,10 @@ public class TsFileProcessor {
     flushQueryLock.writeLock().lock();
     logFlushQueryWriteLocked();
     try {
+      if (closeFuture != null) {
+        return closeFuture;
+      }
+
       if (logger.isDebugEnabled()) {
         if (workMemTable != null) {
           logger.debug(
@@ -1293,10 +1302,6 @@ public class TsFileProcessor {
               tsFileResource.getTsFileSize());
         }
       }
-
-      if (shouldClose) {
-        return CompletableFuture.completedFuture(null);
-      }
       // when a flush thread serves this TsFileProcessor (because the 
processor is submitted by
       // registerTsFileProcessor()), the thread will seal the corresponding 
TsFile and
       // execute other cleanup works if "shouldClose == true and 
flushingMemTables is empty".
@@ -1315,6 +1320,7 @@ public class TsFileProcessor {
         // flushing memTable in System module.
         Future<?> future = addAMemtableIntoFlushingList(tmpMemTable);
         shouldClose = true;
+        closeFuture = future;
         return future;
       } catch (Exception e) {
         logger.error(
@@ -2384,4 +2390,12 @@ public class TsFileProcessor {
   public String toString() {
     return "TsFileProcessor{" + "tsFileResource=" + tsFileResource + '}';
   }
+
+  public Future<?> getCloseFuture() {
+    return closeFuture;
+  }
+
+  public void setCloseFuture(Future<?> closeFuture) {
+    this.closeFuture = closeFuture;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index c3895a058c6..0cb7143e708 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -93,9 +93,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+import static org.junit.Assert.assertTrue;
 
 public class DataRegionTest {
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
@@ -266,7 +270,7 @@ public class DataRegionTest {
             null);
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
   }
 
@@ -314,7 +318,7 @@ public class DataRegionTest {
     Assert.assertEquals(1, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
   }
 
@@ -362,7 +366,7 @@ public class DataRegionTest {
     Assert.assertEquals(1, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
   }
 
@@ -451,7 +455,7 @@ public class DataRegionTest {
     Assert.assertEquals(2, queryDataSource.getSeqResources().size());
     Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
   }
 
@@ -518,7 +522,7 @@ public class DataRegionTest {
             times.length);
 
     dataRegion.insertTablet(insertTabletNode2);
-    Assert.assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
+    assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
     dataRegion.syncDeleteDataFiles();
     Assert.assertEquals(0, SystemInfo.getInstance().getTotalMemTableSize());
 
@@ -603,7 +607,7 @@ public class DataRegionTest {
     Assert.assertEquals(0, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
   }
 
@@ -679,7 +683,7 @@ public class DataRegionTest {
     Assert.assertEquals(0, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
   }
 
@@ -706,10 +710,10 @@ public class DataRegionTest {
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
     for (TsFileResource resource : queryDataSource.getUnseqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
   }
 
@@ -740,10 +744,10 @@ public class DataRegionTest {
     Assert.assertEquals(0, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
     for (TsFileResource resource : queryDataSource.getUnseqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
   }
 
@@ -773,10 +777,10 @@ public class DataRegionTest {
     Assert.assertEquals(0, queryDataSource.getSeqResources().size());
     Assert.assertEquals(20, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
     for (TsFileResource resource : queryDataSource.getUnseqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
 
     config.setEnableSeparateData(defaultValue);
@@ -855,7 +859,7 @@ public class DataRegionTest {
     Assert.assertEquals(0, queryDataSource.getSeqResources().size());
     Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
 
     config.setEnableSeparateData(defaultEnableDiscard);
@@ -935,7 +939,7 @@ public class DataRegionTest {
     Assert.assertEquals(0, queryDataSource.getSeqResources().size());
     Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
 
     config.setEnableSeparateData(defaultEnableDiscard);
@@ -1015,7 +1019,7 @@ public class DataRegionTest {
     Assert.assertEquals(0, queryDataSource.getSeqResources().size());
     Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
 
     config.setEnableSeparateData(defaultEnableDiscard);
@@ -1055,7 +1059,7 @@ public class DataRegionTest {
     Assert.assertEquals(1, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
     dataRegion1.syncDeleteDataFiles();
   }
@@ -1092,10 +1096,10 @@ public class DataRegionTest {
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
     for (TsFileResource resource : queryDataSource.getUnseqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
 
     dataRegion1.syncDeleteDataFiles();
@@ -1160,10 +1164,10 @@ public class DataRegionTest {
             Collections.singletonList(nonAlignedFullPath), device, context, 
null, null);
     Assert.assertEquals(2, queryDataSource.getSeqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
     for (TsFileResource resource : queryDataSource.getUnseqResources()) {
-      Assert.assertTrue(resource.isClosed());
+      assertTrue(resource.isClosed());
     }
     IoTDBDescriptor.getInstance()
         .getConfig()
@@ -1232,7 +1236,7 @@ public class DataRegionTest {
                   + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
       Assert.assertFalse(logFile.exists());
       
Assert.assertFalse(CommonDescriptor.getInstance().getConfig().isReadOnly());
-      Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
+      assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
     } finally {
       new CompactionConfigRestorer().restoreCompactionConfig();
     }
@@ -1392,10 +1396,10 @@ public class DataRegionTest {
     for (int i = 0; i < dataRegion.getSequenceFileList().size(); i++) {
       TsFileResource resource = dataRegion.getSequenceFileList().get(i);
       if (i == 1) {
-        Assert.assertTrue(resource.anyModFileExists());
+        assertTrue(resource.anyModFileExists());
         Assert.assertEquals(2, resource.getAllModEntries().size());
       } else if (i == 3) {
-        Assert.assertTrue(resource.anyModFileExists());
+        assertTrue(resource.anyModFileExists());
         Assert.assertEquals(1, resource.getAllModEntries().size());
       } else {
         Assert.assertFalse(resource.anyModFileExists());
@@ -1489,7 +1493,7 @@ public class DataRegionTest {
     dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"), 
deleteDataNode4);
 
     dataRegion.syncCloseAllWorkingTsFileProcessors();
-    Assert.assertTrue(tsFileResource.anyModFileExists());
+    assertTrue(tsFileResource.anyModFileExists());
     Assert.assertEquals(3, tsFileResource.getAllModEntries().size());
   }
 
@@ -1584,7 +1588,7 @@ public class DataRegionTest {
     dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"), 
deleteDataNode12);
 
     dataRegion.syncCloseAllWorkingTsFileProcessors();
-    Assert.assertTrue(tsFileResource.anyModFileExists());
+    assertTrue(tsFileResource.anyModFileExists());
     Assert.assertEquals(3, tsFileResource.getAllModEntries().size());
   }
 
@@ -1686,7 +1690,7 @@ public class DataRegionTest {
         new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 100);
     deleteDataNode1.setSearchIndex(0);
     dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), 
deleteDataNode1);
-    Assert.assertTrue(tsFileResource.getTsFile().exists());
+    assertTrue(tsFileResource.getTsFile().exists());
     Assert.assertFalse(tsFileResource.anyModFileExists());
 
     dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -1696,8 +1700,8 @@ public class DataRegionTest {
         new DeleteDataNode(new PlanNodeId("2"), 
Collections.singletonList(path), 100, 120);
     deleteDataNode2.setSearchIndex(0);
     dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), 
deleteDataNode2);
-    Assert.assertTrue(tsFileResource.getTsFile().exists());
-    Assert.assertTrue(tsFileResource.anyModFileExists());
+    assertTrue(tsFileResource.getTsFile().exists());
+    assertTrue(tsFileResource.anyModFileExists());
 
     // delete data in closed file, and time all match
     DeleteDataNode deleteDataNode3 =
@@ -1727,8 +1731,8 @@ public class DataRegionTest {
     dataRegion.syncCloseWorkingTsFileProcessors(true);
     TsFileResource tsFileResourceUnSeq = 
dataRegion.getTsFileManager().getTsFileList(false).get(0);
 
-    Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
-    Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+    assertTrue(tsFileResourceSeq.getTsFile().exists());
+    assertTrue(tsFileResourceUnSeq.getTsFile().exists());
 
     // already closed, will have a mods file.
     MeasurementPath path = new MeasurementPath("root.vehicle.d0.**");
@@ -1743,9 +1747,9 @@ public class DataRegionTest {
     dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), 
deleteDataNode2);
 
     // delete data in mem table, there is no mods
-    Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
-    Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
-    Assert.assertTrue(tsFileResourceSeq.anyModFileExists());
+    assertTrue(tsFileResourceSeq.getTsFile().exists());
+    assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+    assertTrue(tsFileResourceSeq.anyModFileExists());
     Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists());
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -1753,8 +1757,8 @@ public class DataRegionTest {
         new DeleteDataNode(new PlanNodeId("3"), 
Collections.singletonList(path), 40, 80);
     deleteDataNode3.setSearchIndex(0);
     dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), 
deleteDataNode3);
-    Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
-    Assert.assertTrue(tsFileResourceUnSeq.anyModFileExists());
+    assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+    assertTrue(tsFileResourceUnSeq.anyModFileExists());
 
     // seq file and unseq file have data file and mod file now,
     // this deletion will remove data file and mod file.
@@ -1772,4 +1776,21 @@ public class DataRegionTest {
     Assert.assertFalse(tsFileResourceSeq.anyModFileExists());
     Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists());
   }
+
+  @Test
+  public void testFlushSpecifiedResource()
+      throws IllegalPathException, WriteProcessException, ExecutionException, 
InterruptedException {
+    for (int j = 100; j < 200; j++) {
+      TSRecord record = new TSRecord(deviceId, j);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    }
+    TsFileResource tsFileResourceSeq = 
dataRegion.getTsFileManager().getTsFileList(true).get(0);
+    Future<?> future = 
dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq);
+    Future<?> future2 = 
dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq);
+    assertTrue(future == future2 || future2 instanceof CompletableFuture);
+
+    future.get();
+    assertTrue(tsFileResourceSeq.isClosed());
+  }
 }

Reply via email to