This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7e0837b28a0 Add an interface to support flushing by TsFileResource
(#17203)
7e0837b28a0 is described below
commit 7e0837b28a03d2df3e95c55a570b6c9ef67423b2
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Feb 13 16:51:22 2026 +0800
Add an interface to support flushing by TsFileResource (#17203)
* Add an interface to support flushing by TsFileResource
* Fix review
* spotless
---
.../db/storageengine/dataregion/DataRegion.java | 50 +++++++-----
.../dataregion/memtable/TsFileProcessor.java | 31 ++++++--
.../storageengine/dataregion/DataRegionTest.java | 93 +++++++++++++---------
3 files changed, 111 insertions(+), 63 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d95d51e390e..ef4a4910e1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1939,6 +1939,21 @@ public class DataRegion implements IDataRegionForQuery {
return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt,
unseqCompactionCnt);
}
+ /**
+ * close the TsFile represented by the given resource, thread-safe
+ *
+ * @param tsFileResource TsFile to be closed
+ * @return a future related to the close task
+ */
+ public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource)
{
+ writeLock("asyncCloseOneTsFileProcessor");
+ try {
+ return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(),
tsFileResource.getProcessor());
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* close one tsfile processor, thread-safety should be ensured by caller
*
@@ -1946,31 +1961,26 @@ public class DataRegion implements IDataRegionForQuery {
* @param tsFileProcessor tsfile processor
*/
public Future<?> asyncCloseOneTsFileProcessor(boolean sequence,
TsFileProcessor tsFileProcessor) {
- // for sequence tsfile, we update the endTimeMap only when the file is
prepared to be closed.
- // for unsequence tsfile, we have maintained the endTimeMap when an
insertion comes.
- if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
- || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
- || tsFileProcessor.alreadyMarkedClosing()) {
+ if (tsFileProcessor == null) {
return CompletableFuture.completedFuture(null);
}
- Future<?> future;
- if (sequence) {
- closingSequenceTsFileProcessor.add(tsFileProcessor);
- future = tsFileProcessor.asyncClose();
- if (future.isDone()) {
- closingSequenceTsFileProcessor.remove(tsFileProcessor);
- }
+ if (tsFileProcessor.getCloseFuture() != null) {
+ return tsFileProcessor.getCloseFuture();
+ }
- workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
- } else {
- closingUnSequenceTsFileProcessor.add(tsFileProcessor);
- future = tsFileProcessor.asyncClose();
- if (future.isDone()) {
- closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
- }
+ Future<?> future;
+ Set<TsFileProcessor> closingTsFileProcessors =
+ sequence ? closingSequenceTsFileProcessor :
closingUnSequenceTsFileProcessor;
+ TreeMap<Long, TsFileProcessor> workTsFileProcessors =
+ sequence ? workSequenceTsFileProcessors :
workUnsequenceTsFileProcessors;
- workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
+ closingTsFileProcessors.add(tsFileProcessor);
+ future = tsFileProcessor.asyncClose();
+ if (future.isDone()) {
+ closingTsFileProcessors.remove(tsFileProcessor);
}
+ workTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
+
TsFileResource resource = tsFileProcessor.getTsFileResource();
logger.info(
"Async close tsfile: {}, file start time: {}, file end time: {}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index acdac61180b..78b33c02c0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -210,6 +210,8 @@ public class TsFileProcessor {
private int walEntryNum = 0;
+ private volatile Future<?> closeFuture;
+
@SuppressWarnings("squid:S107")
public TsFileProcessor(
String dataRegionName,
@@ -1250,10 +1252,13 @@ public class TsFileProcessor {
logger.info(
"Sync close file: {}, will firstly async close it",
tsFileResource.getTsFile().getAbsolutePath());
- if (shouldClose) {
- return;
- }
+
try {
+ if (closeFuture != null) {
+ closeFuture.get();
+ return;
+ }
+
asyncClose().get();
logger.info("Start to wait until file {} is closed", tsFileResource);
// if this TsFileProcessor is closing, asyncClose().get() of this thread
will return quickly,
@@ -1273,6 +1278,10 @@ public class TsFileProcessor {
flushQueryLock.writeLock().lock();
logFlushQueryWriteLocked();
try {
+ if (closeFuture != null) {
+ return closeFuture;
+ }
+
if (logger.isDebugEnabled()) {
if (workMemTable != null) {
logger.debug(
@@ -1293,10 +1302,6 @@ public class TsFileProcessor {
tsFileResource.getTsFileSize());
}
}
-
- if (shouldClose) {
- return CompletableFuture.completedFuture(null);
- }
// when a flush thread serves this TsFileProcessor (because the
processor is submitted by
// registerTsFileProcessor()), the thread will seal the corresponding
TsFile and
// execute other cleanup works if "shouldClose == true and
flushingMemTables is empty".
@@ -1315,6 +1320,7 @@ public class TsFileProcessor {
// flushing memTable in System module.
Future<?> future = addAMemtableIntoFlushingList(tmpMemTable);
shouldClose = true;
+ closeFuture = future;
return future;
} catch (Exception e) {
logger.error(
@@ -1794,6 +1800,9 @@ public class TsFileProcessor {
public void setManagedByFlushManager(boolean managedByFlushManager) {
this.managedByFlushManager = managedByFlushManager;
+ if (!managedByFlushManager) {
+ closeFuture = CompletableFuture.completedFuture(null);
+ }
}
/** Close this tsfile */
@@ -2384,4 +2393,12 @@ public class TsFileProcessor {
public String toString() {
return "TsFileProcessor{" + "tsFileResource=" + tsFileResource + '}';
}
+
+ public Future<?> getCloseFuture() {
+ return closeFuture;
+ }
+
+ public void setCloseFuture(Future<?> closeFuture) {
+ this.closeFuture = closeFuture;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index c3895a058c6..0cb7143e708 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -93,9 +93,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
+import static org.junit.Assert.assertTrue;
public class DataRegionTest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -266,7 +270,7 @@ public class DataRegionTest {
null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -314,7 +318,7 @@ public class DataRegionTest {
Assert.assertEquals(1, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -362,7 +366,7 @@ public class DataRegionTest {
Assert.assertEquals(1, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -451,7 +455,7 @@ public class DataRegionTest {
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -518,7 +522,7 @@ public class DataRegionTest {
times.length);
dataRegion.insertTablet(insertTabletNode2);
- Assert.assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
+ assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
dataRegion.syncDeleteDataFiles();
Assert.assertEquals(0, SystemInfo.getInstance().getTotalMemTableSize());
@@ -603,7 +607,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -679,7 +683,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -706,10 +710,10 @@ public class DataRegionTest {
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -740,10 +744,10 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
}
@@ -773,10 +777,10 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(20, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
config.setEnableSeparateData(defaultValue);
@@ -855,7 +859,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
config.setEnableSeparateData(defaultEnableDiscard);
@@ -935,7 +939,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
config.setEnableSeparateData(defaultEnableDiscard);
@@ -1015,7 +1019,7 @@ public class DataRegionTest {
Assert.assertEquals(0, queryDataSource.getSeqResources().size());
Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
config.setEnableSeparateData(defaultEnableDiscard);
@@ -1055,7 +1059,7 @@ public class DataRegionTest {
Assert.assertEquals(1, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
dataRegion1.syncDeleteDataFiles();
}
@@ -1092,10 +1096,10 @@ public class DataRegionTest {
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
dataRegion1.syncDeleteDataFiles();
@@ -1160,10 +1164,10 @@ public class DataRegionTest {
Collections.singletonList(nonAlignedFullPath), device, context,
null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
- Assert.assertTrue(resource.isClosed());
+ assertTrue(resource.isClosed());
}
IoTDBDescriptor.getInstance()
.getConfig()
@@ -1232,7 +1236,7 @@ public class DataRegionTest {
+ CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX);
Assert.assertFalse(logFile.exists());
Assert.assertFalse(CommonDescriptor.getInstance().getConfig().isReadOnly());
- Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
+ assertTrue(dataRegion.getTsFileManager().isAllowCompaction());
} finally {
new CompactionConfigRestorer().restoreCompactionConfig();
}
@@ -1392,10 +1396,10 @@ public class DataRegionTest {
for (int i = 0; i < dataRegion.getSequenceFileList().size(); i++) {
TsFileResource resource = dataRegion.getSequenceFileList().get(i);
if (i == 1) {
- Assert.assertTrue(resource.anyModFileExists());
+ assertTrue(resource.anyModFileExists());
Assert.assertEquals(2, resource.getAllModEntries().size());
} else if (i == 3) {
- Assert.assertTrue(resource.anyModFileExists());
+ assertTrue(resource.anyModFileExists());
Assert.assertEquals(1, resource.getAllModEntries().size());
} else {
Assert.assertFalse(resource.anyModFileExists());
@@ -1489,7 +1493,7 @@ public class DataRegionTest {
dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"),
deleteDataNode4);
dataRegion.syncCloseAllWorkingTsFileProcessors();
- Assert.assertTrue(tsFileResource.anyModFileExists());
+ assertTrue(tsFileResource.anyModFileExists());
Assert.assertEquals(3, tsFileResource.getAllModEntries().size());
}
@@ -1584,7 +1588,7 @@ public class DataRegionTest {
dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"),
deleteDataNode12);
dataRegion.syncCloseAllWorkingTsFileProcessors();
- Assert.assertTrue(tsFileResource.anyModFileExists());
+ assertTrue(tsFileResource.anyModFileExists());
Assert.assertEquals(3, tsFileResource.getAllModEntries().size());
}
@@ -1686,7 +1690,7 @@ public class DataRegionTest {
new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 100);
deleteDataNode1.setSearchIndex(0);
dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"),
deleteDataNode1);
- Assert.assertTrue(tsFileResource.getTsFile().exists());
+ assertTrue(tsFileResource.getTsFile().exists());
Assert.assertFalse(tsFileResource.anyModFileExists());
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -1696,8 +1700,8 @@ public class DataRegionTest {
new DeleteDataNode(new PlanNodeId("2"),
Collections.singletonList(path), 100, 120);
deleteDataNode2.setSearchIndex(0);
dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"),
deleteDataNode2);
- Assert.assertTrue(tsFileResource.getTsFile().exists());
- Assert.assertTrue(tsFileResource.anyModFileExists());
+ assertTrue(tsFileResource.getTsFile().exists());
+ assertTrue(tsFileResource.anyModFileExists());
// delete data in closed file, and time all match
DeleteDataNode deleteDataNode3 =
@@ -1727,8 +1731,8 @@ public class DataRegionTest {
dataRegion.syncCloseWorkingTsFileProcessors(true);
TsFileResource tsFileResourceUnSeq =
dataRegion.getTsFileManager().getTsFileList(false).get(0);
- Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
- Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+ assertTrue(tsFileResourceSeq.getTsFile().exists());
+ assertTrue(tsFileResourceUnSeq.getTsFile().exists());
// already closed, will have a mods file.
MeasurementPath path = new MeasurementPath("root.vehicle.d0.**");
@@ -1743,9 +1747,9 @@ public class DataRegionTest {
dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"),
deleteDataNode2);
// delete data in mem table, there is no mods
- Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
- Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
- Assert.assertTrue(tsFileResourceSeq.anyModFileExists());
+ assertTrue(tsFileResourceSeq.getTsFile().exists());
+ assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+ assertTrue(tsFileResourceSeq.anyModFileExists());
Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists());
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -1753,8 +1757,8 @@ public class DataRegionTest {
new DeleteDataNode(new PlanNodeId("3"),
Collections.singletonList(path), 40, 80);
deleteDataNode3.setSearchIndex(0);
dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"),
deleteDataNode3);
- Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
- Assert.assertTrue(tsFileResourceUnSeq.anyModFileExists());
+ assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+ assertTrue(tsFileResourceUnSeq.anyModFileExists());
// seq file and unseq file have data file and mod file now,
// this deletion will remove data file and mod file.
@@ -1772,4 +1776,21 @@ public class DataRegionTest {
Assert.assertFalse(tsFileResourceSeq.anyModFileExists());
Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists());
}
+
+ @Test
+ public void testFlushSpecifiedResource()
+ throws IllegalPathException, WriteProcessException, ExecutionException,
InterruptedException {
+ for (int j = 100; j < 200; j++) {
+ TSRecord record = new TSRecord(deviceId, j);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ }
+ TsFileResource tsFileResourceSeq =
dataRegion.getTsFileManager().getTsFileList(true).get(0);
+ Future<?> future =
dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq);
+ Future<?> future2 =
dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq);
+ assertTrue(future == future2 || future2 instanceof CompletableFuture);
+
+ future.get();
+ assertTrue(tsFileResourceSeq.isClosed());
+ }
}