This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b2246339211 Fixed TsFileProcessorTest may check the result before the 
TsFile is closed (#17232)
b2246339211 is described below

commit b22463392112b0cddcdbcc0858eefe69093a906c
Author: Jiang Tian <[email protected]>
AuthorDate: Mon Mar 2 10:48:32 2026 +0800

    Fixed TsFileProcessorTest may check the result before the TsFile is closed 
(#17232)
---
 .../dataregion/memtable/TsFileProcessor.java       |  49 +------
 .../dataregion/memtable/TsFileProcessorTest.java   | 158 ---------------------
 2 files changed, 1 insertion(+), 206 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 518df5322a9..22e6498b837 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1336,53 +1336,6 @@ public class TsFileProcessor {
     return CompletableFuture.completedFuture(null);
   }
 
-  /**
-   * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never 
wakeup Tips: I am
-   * trying to solve this issue by checking whether the table exist before 
wait()
-   */
-  @TestOnly
-  public void syncFlush() throws IOException {
-    IMemTable tmpMemTable;
-    flushQueryLock.writeLock().lock();
-    logFlushQueryWriteLocked();
-    try {
-      tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : 
workMemTable;
-      if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
-        logger.debug(
-            "{}: {} add a signal memtable into flushing memtable list when 
sync flush",
-            dataRegionName,
-            tsFileResource.getTsFile().getName());
-      }
-      addAMemtableIntoFlushingList(tmpMemTable);
-    } finally {
-      flushQueryLock.writeLock().unlock();
-      logFlushQueryWriteUnlocked();
-    }
-
-    synchronized (flushingMemTables) {
-      try {
-        long startWait = System.currentTimeMillis();
-        while (flushingMemTables.contains(tmpMemTable)) {
-          flushingMemTables.wait(1000);
-
-          if ((System.currentTimeMillis() - startWait) > 60_000) {
-            logger.warn(
-                "has waited for synced flushing a memtable in {} for 60 
seconds.",
-                this.tsFileResource.getTsFile().getAbsolutePath());
-            startWait = System.currentTimeMillis();
-          }
-        }
-      } catch (InterruptedException e) {
-        logger.error(
-            "{}: {} wait flush finished meets error",
-            dataRegionName,
-            tsFileResource.getTsFile().getName(),
-            e);
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
   /** Put the working memtable into flushing list and set the working memtable 
to null */
   public void asyncFlush() {
     flushQueryLock.writeLock().lock();
@@ -1393,7 +1346,7 @@ public class TsFileProcessor {
       }
       logger.info(
           "Async flush a memtable to tsfile: {}", 
tsFileResource.getTsFile().getAbsolutePath());
-      addAMemtableIntoFlushingList(workMemTable);
+      closeFuture = addAMemtableIntoFlushingList(workMemTable);
     } catch (Exception e) {
       logger.error(
           "{}: {} add a memtable into flushing list failed",
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index 3d1e779514d..86c10932e04 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.db.utils.constant.TestConstant;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.external.commons.io.FileUtils;
-import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -60,7 +59,6 @@ import org.apache.tsfile.read.reader.IPointReader;
 import org.apache.tsfile.write.record.TSRecord;
 import org.apache.tsfile.write.record.datapoint.DataPoint;
 import org.apache.tsfile.write.schema.MeasurementSchema;
-import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -183,12 +181,6 @@ public class TsFileProcessorTest {
     }
 
     // flush synchronously
-    processor.syncFlush();
-
-    tsfileResourcesForQuery.clear();
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-    assertEquals(1, 
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
     processor.syncClose();
 
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -258,12 +250,6 @@ public class TsFileProcessorTest {
     }
 
     // flush synchronously
-    processor.syncFlush();
-
-    tsfileResourcesForQuery.clear();
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-    assertEquals(3, 
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
     processor.syncClose();
 
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -337,12 +323,6 @@ public class TsFileProcessorTest {
     }
 
     // flush synchronously
-    processor.syncFlush();
-
-    tsfileResourcesForQuery.clear();
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-    assertEquals(3, 
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
     processor.syncClose();
 
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -423,12 +403,6 @@ public class TsFileProcessorTest {
     }
 
     // flush synchronously
-    processor.syncFlush();
-
-    tsfileResourcesForQuery.clear();
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-    assertEquals(3, 
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
     processor.syncClose();
 
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -512,12 +486,6 @@ public class TsFileProcessorTest {
     }
 
     // flush synchronously
-    processor.syncFlush();
-
-    tsfileResourcesForQuery.clear();
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-    assertEquals(3, 
tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
     processor.syncClose();
 
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -537,132 +505,6 @@ public class TsFileProcessorTest {
     }
   }
 
-  @Test
-  public void testWriteAndRestoreMetadata()
-      throws IOException, WriteProcessException, MetadataException, 
ExecutionException {
-    logger.info("testWriteAndRestoreMetadata begin..");
-    processor =
-        new TsFileProcessor(
-            storageGroup,
-            SystemFileFactory.INSTANCE.getFile(filePath),
-            sgInfo,
-            this::closeTsFileProcessor,
-            (tsFileProcessor, updateMap, systemFlushTime) -> {},
-            true);
-
-    TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
-    processor.setTsFileProcessorInfo(tsFileProcessorInfo);
-    this.sgInfo.initTsFileProcessorInfo(processor);
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
-    List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
-    NonAlignedFullPath fullPath =
-        new NonAlignedFullPath(
-            IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
-            new MeasurementSchema(
-                measurementId, dataType, encoding, 
CompressionType.UNCOMPRESSED, props));
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    assertTrue(tsfileResourcesForQuery.isEmpty());
-
-    for (int i = 1; i <= 100; i++) {
-      TSRecord record = new TSRecord(deviceId, i);
-      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
-      processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]);
-    }
-
-    // query data in memory
-    tsfileResourcesForQuery.clear();
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    
assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-    int num = 1;
-    List<ReadOnlyMemChunk> memChunks = 
tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
-    for (ReadOnlyMemChunk chunk : memChunks) {
-      IPointReader iterator = chunk.getPointReader();
-      for (; num <= 100; num++) {
-        iterator.hasNextTimeValuePair();
-        TimeValuePair timeValuePair = iterator.nextTimeValuePair();
-        assertEquals(num, timeValuePair.getTimestamp());
-        assertEquals(num, timeValuePair.getValue().getInt());
-      }
-    }
-    logger.info("syncFlush..");
-    // flush synchronously
-    processor.syncFlush();
-
-    tsfileResourcesForQuery.clear();
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-
-    RestorableTsFileIOWriter tsFileIOWriter = processor.getWriter();
-    Map<IDeviceID, List<ChunkMetadata>> chunkMetaDataListInChunkGroups =
-        tsFileIOWriter.getDeviceChunkMetadataMap();
-    RestorableTsFileIOWriter restorableTsFileIOWriter =
-        new 
RestorableTsFileIOWriter(SystemFileFactory.INSTANCE.getFile(filePath));
-    Map<IDeviceID, List<ChunkMetadata>> restoredChunkMetaDataListInChunkGroups 
=
-        restorableTsFileIOWriter.getDeviceChunkMetadataMap();
-    assertEquals(
-        chunkMetaDataListInChunkGroups.size(), 
restoredChunkMetaDataListInChunkGroups.size());
-    for (Map.Entry<IDeviceID, List<ChunkMetadata>> entry1 :
-        chunkMetaDataListInChunkGroups.entrySet()) {
-      for (Map.Entry<IDeviceID, List<ChunkMetadata>> entry2 :
-          restoredChunkMetaDataListInChunkGroups.entrySet()) {
-        assertEquals(entry1.getKey(), entry2.getKey());
-        assertEquals(entry1.getValue().size(), entry2.getValue().size());
-        for (int i = 0; i < entry1.getValue().size(); i++) {
-          ChunkMetadata chunkMetaData = entry1.getValue().get(i);
-          chunkMetaData.setVersion(0);
-          ChunkMetadata chunkMetadataRestore = entry2.getValue().get(i);
-          chunkMetadataRestore.setVersion(0);
-        }
-      }
-    }
-    restorableTsFileIOWriter.close();
-    logger.info("syncClose..");
-    processor.syncClose();
-    // we need to close the tsfile writer first and then reopen it.
-  }
-
-  @Test
-  public void testMultiFlush()
-      throws IOException, WriteProcessException, MetadataException, 
ExecutionException {
-    processor =
-        new TsFileProcessor(
-            storageGroup,
-            SystemFileFactory.INSTANCE.getFile(filePath),
-            sgInfo,
-            this::closeTsFileProcessor,
-            (tsFileProcessor, updateMap, systemFlushTime) -> {},
-            true);
-
-    TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
-    processor.setTsFileProcessorInfo(tsFileProcessorInfo);
-    this.sgInfo.initTsFileProcessorInfo(processor);
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
-    List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
-    NonAlignedFullPath fullPath =
-        new NonAlignedFullPath(
-            IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
-            new MeasurementSchema(
-                measurementId, dataType, encoding, 
CompressionType.UNCOMPRESSED, props));
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    assertTrue(tsfileResourcesForQuery.isEmpty());
-
-    for (int flushId = 0; flushId < 10; flushId++) {
-      for (int i = 1; i <= 10; i++) {
-        TSRecord record = new TSRecord(deviceId, i);
-        record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
-        processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]);
-      }
-      processor.asyncFlush();
-    }
-    processor.syncFlush();
-
-    tsfileResourcesForQuery.clear();
-    processor.query(Collections.singletonList(fullPath), context, 
tsfileResourcesForQuery, null);
-    assertFalse(tsfileResourcesForQuery.isEmpty());
-    
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
-    processor.syncClose();
-  }
-
   @Test
   public void alignedTvListRamCostTest()
       throws MetadataException, WriteProcessException, IOException {

Reply via email to