This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b2246339211 Fixed TsFileProcessorTest may check the result before the
TsFile is closed (#17232)
b2246339211 is described below
commit b22463392112b0cddcdbcc0858eefe69093a906c
Author: Jiang Tian <[email protected]>
AuthorDate: Mon Mar 2 10:48:32 2026 +0800
Fixed TsFileProcessorTest may check the result before the TsFile is closed
(#17232)
---
.../dataregion/memtable/TsFileProcessor.java | 49 +------
.../dataregion/memtable/TsFileProcessorTest.java | 158 ---------------------
2 files changed, 1 insertion(+), 206 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 518df5322a9..22e6498b837 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1336,53 +1336,6 @@ public class TsFileProcessor {
return CompletableFuture.completedFuture(null);
}
- /**
- * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never
wakeup Tips: I am
- * trying to solve this issue by checking whether the table exist before
wait()
- */
- @TestOnly
- public void syncFlush() throws IOException {
- IMemTable tmpMemTable;
- flushQueryLock.writeLock().lock();
- logFlushQueryWriteLocked();
- try {
- tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() :
workMemTable;
- if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
- logger.debug(
- "{}: {} add a signal memtable into flushing memtable list when
sync flush",
- dataRegionName,
- tsFileResource.getTsFile().getName());
- }
- addAMemtableIntoFlushingList(tmpMemTable);
- } finally {
- flushQueryLock.writeLock().unlock();
- logFlushQueryWriteUnlocked();
- }
-
- synchronized (flushingMemTables) {
- try {
- long startWait = System.currentTimeMillis();
- while (flushingMemTables.contains(tmpMemTable)) {
- flushingMemTables.wait(1000);
-
- if ((System.currentTimeMillis() - startWait) > 60_000) {
- logger.warn(
- "has waited for synced flushing a memtable in {} for 60
seconds.",
- this.tsFileResource.getTsFile().getAbsolutePath());
- startWait = System.currentTimeMillis();
- }
- }
- } catch (InterruptedException e) {
- logger.error(
- "{}: {} wait flush finished meets error",
- dataRegionName,
- tsFileResource.getTsFile().getName(),
- e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
/** Put the working memtable into flushing list and set the working memtable
to null */
public void asyncFlush() {
flushQueryLock.writeLock().lock();
@@ -1393,7 +1346,7 @@ public class TsFileProcessor {
}
logger.info(
"Async flush a memtable to tsfile: {}",
tsFileResource.getTsFile().getAbsolutePath());
- addAMemtableIntoFlushingList(workMemTable);
+ closeFuture = addAMemtableIntoFlushingList(workMemTable);
} catch (Exception e) {
logger.error(
"{}: {} add a memtable into flushing list failed",
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index 3d1e779514d..86c10932e04 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.external.commons.io.FileUtils;
-import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -60,7 +59,6 @@ import org.apache.tsfile.read.reader.IPointReader;
import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.datapoint.DataPoint;
import org.apache.tsfile.write.schema.MeasurementSchema;
-import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -183,12 +181,6 @@ public class TsFileProcessorTest {
}
// flush synchronously
- processor.syncFlush();
-
- tsfileResourcesForQuery.clear();
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
- assertEquals(1,
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
processor.syncClose();
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -258,12 +250,6 @@ public class TsFileProcessorTest {
}
// flush synchronously
- processor.syncFlush();
-
- tsfileResourcesForQuery.clear();
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
- assertEquals(3,
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
processor.syncClose();
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -337,12 +323,6 @@ public class TsFileProcessorTest {
}
// flush synchronously
- processor.syncFlush();
-
- tsfileResourcesForQuery.clear();
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
- assertEquals(3,
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
processor.syncClose();
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -423,12 +403,6 @@ public class TsFileProcessorTest {
}
// flush synchronously
- processor.syncFlush();
-
- tsfileResourcesForQuery.clear();
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
- assertEquals(3,
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
processor.syncClose();
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -512,12 +486,6 @@ public class TsFileProcessorTest {
}
// flush synchronously
- processor.syncFlush();
-
- tsfileResourcesForQuery.clear();
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
- assertEquals(3,
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
processor.syncClose();
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -537,132 +505,6 @@ public class TsFileProcessorTest {
}
}
- @Test
- public void testWriteAndRestoreMetadata()
- throws IOException, WriteProcessException, MetadataException,
ExecutionException {
- logger.info("testWriteAndRestoreMetadata begin..");
- processor =
- new TsFileProcessor(
- storageGroup,
- SystemFileFactory.INSTANCE.getFile(filePath),
- sgInfo,
- this::closeTsFileProcessor,
- (tsFileProcessor, updateMap, systemFlushTime) -> {},
- true);
-
- TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
- processor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.sgInfo.initTsFileProcessorInfo(processor);
- SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
- List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
- NonAlignedFullPath fullPath =
- new NonAlignedFullPath(
- IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
- new MeasurementSchema(
- measurementId, dataType, encoding,
CompressionType.UNCOMPRESSED, props));
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
- assertTrue(tsfileResourcesForQuery.isEmpty());
-
- for (int i = 1; i <= 100; i++) {
- TSRecord record = new TSRecord(deviceId, i);
- record.addTuple(DataPoint.getDataPoint(dataType, measurementId,
String.valueOf(i)));
- processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]);
- }
-
- // query data in memory
- tsfileResourcesForQuery.clear();
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
-
assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
- int num = 1;
- List<ReadOnlyMemChunk> memChunks =
tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
- for (ReadOnlyMemChunk chunk : memChunks) {
- IPointReader iterator = chunk.getPointReader();
- for (; num <= 100; num++) {
- iterator.hasNextTimeValuePair();
- TimeValuePair timeValuePair = iterator.nextTimeValuePair();
- assertEquals(num, timeValuePair.getTimestamp());
- assertEquals(num, timeValuePair.getValue().getInt());
- }
- }
- logger.info("syncFlush..");
- // flush synchronously
- processor.syncFlush();
-
- tsfileResourcesForQuery.clear();
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-
- RestorableTsFileIOWriter tsFileIOWriter = processor.getWriter();
- Map<IDeviceID, List<ChunkMetadata>> chunkMetaDataListInChunkGroups =
- tsFileIOWriter.getDeviceChunkMetadataMap();
- RestorableTsFileIOWriter restorableTsFileIOWriter =
- new
RestorableTsFileIOWriter(SystemFileFactory.INSTANCE.getFile(filePath));
- Map<IDeviceID, List<ChunkMetadata>> restoredChunkMetaDataListInChunkGroups
=
- restorableTsFileIOWriter.getDeviceChunkMetadataMap();
- assertEquals(
- chunkMetaDataListInChunkGroups.size(),
restoredChunkMetaDataListInChunkGroups.size());
- for (Map.Entry<IDeviceID, List<ChunkMetadata>> entry1 :
- chunkMetaDataListInChunkGroups.entrySet()) {
- for (Map.Entry<IDeviceID, List<ChunkMetadata>> entry2 :
- restoredChunkMetaDataListInChunkGroups.entrySet()) {
- assertEquals(entry1.getKey(), entry2.getKey());
- assertEquals(entry1.getValue().size(), entry2.getValue().size());
- for (int i = 0; i < entry1.getValue().size(); i++) {
- ChunkMetadata chunkMetaData = entry1.getValue().get(i);
- chunkMetaData.setVersion(0);
- ChunkMetadata chunkMetadataRestore = entry2.getValue().get(i);
- chunkMetadataRestore.setVersion(0);
- }
- }
- }
- restorableTsFileIOWriter.close();
- logger.info("syncClose..");
- processor.syncClose();
- // we need to close the tsfile writer first and then reopen it.
- }
-
- @Test
- public void testMultiFlush()
- throws IOException, WriteProcessException, MetadataException,
ExecutionException {
- processor =
- new TsFileProcessor(
- storageGroup,
- SystemFileFactory.INSTANCE.getFile(filePath),
- sgInfo,
- this::closeTsFileProcessor,
- (tsFileProcessor, updateMap, systemFlushTime) -> {},
- true);
-
- TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
- processor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.sgInfo.initTsFileProcessorInfo(processor);
- SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
- List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
- NonAlignedFullPath fullPath =
- new NonAlignedFullPath(
- IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
- new MeasurementSchema(
- measurementId, dataType, encoding,
CompressionType.UNCOMPRESSED, props));
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
- assertTrue(tsfileResourcesForQuery.isEmpty());
-
- for (int flushId = 0; flushId < 10; flushId++) {
- for (int i = 1; i <= 10; i++) {
- TSRecord record = new TSRecord(deviceId, i);
- record.addTuple(DataPoint.getDataPoint(dataType, measurementId,
String.valueOf(i)));
- processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]);
- }
- processor.asyncFlush();
- }
- processor.syncFlush();
-
- tsfileResourcesForQuery.clear();
- processor.query(Collections.singletonList(fullPath), context,
tsfileResourcesForQuery, null);
- assertFalse(tsfileResourcesForQuery.isEmpty());
-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
- processor.syncClose();
- }
-
@Test
public void alignedTvListRamCostTest()
throws MetadataException, WriteProcessException, IOException {