This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e331eadbeaf Fix non-latest partition last flush time cannot recover 
(#11999)
e331eadbeaf is described below

commit e331eadbeaf4281eb493c54fc9f4e1d5929caa2c
Author: Haonan <[email protected]>
AuthorDate: Mon Jan 29 12:33:39 2024 +0800

    Fix non-latest partition last flush time cannot recover (#11999)
---
 .../db/storageengine/dataregion/DataRegion.java    | 68 +++++-----------------
 .../dataregion/LastFlushTimeMapTest.java           | 37 ++++++++++++
 2 files changed, 51 insertions(+), 54 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 5bda2c80bf3..73a24f63c13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -524,20 +524,12 @@ public class DataRegion implements IDataRegionForQuery {
         }
         for (Entry<Long, List<TsFileResource>> partitionFiles : 
partitionTmpSeqTsFiles.entrySet()) {
           recoverFilesInPartition(
-              partitionFiles.getKey(),
-              dataRegionRecoveryContext,
-              partitionFiles.getValue(),
-              true,
-              partitionFiles.getKey() == latestPartitionId);
+              partitionFiles.getKey(), dataRegionRecoveryContext, 
partitionFiles.getValue(), true);
         }
         for (Entry<Long, List<TsFileResource>> partitionFiles :
             partitionTmpUnseqTsFiles.entrySet()) {
           recoverFilesInPartition(
-              partitionFiles.getKey(),
-              dataRegionRecoveryContext,
-              partitionFiles.getValue(),
-              false,
-              partitionFiles.getKey() == latestPartitionId);
+              partitionFiles.getKey(), dataRegionRecoveryContext, 
partitionFiles.getValue(), false);
         }
         if (config.isEnableSeparateData()) {
           TimePartitionManager.getInstance()
@@ -763,7 +755,7 @@ public class DataRegion implements IDataRegionForQuery {
                 dataRegionInfo,
                 tsFileResource,
                 this::closeUnsealedTsFileProcessorCallBack,
-                isSeq ? this::sequenceFlushCallback : 
this::unsequenceFlushCallback,
+                this::flushCallback,
                 isSeq,
                 writer);
         if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) 
== null
@@ -825,12 +817,11 @@ public class DataRegion implements IDataRegionForQuery {
       long partitionId,
       DataRegionRecoveryContext context,
       List<TsFileResource> resourceList,
-      boolean isSeq,
-      boolean isLatestPartition) {
+      boolean isSeq) {
     for (TsFileResource tsFileResource : resourceList) {
       recoverSealedTsFiles(tsFileResource, context, isSeq);
     }
-    if (isLatestPartition && config.isEnableSeparateData()) {
+    if (config.isEnableSeparateData()) {
       lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId);
       for (TsFileResource tsFileResource : resourceList) {
         updateLastFlushTime(tsFileResource, isSeq);
@@ -1405,26 +1396,14 @@ public class DataRegion implements IDataRegionForQuery {
 
   private TsFileProcessor getTsFileProcessor(
       boolean sequence, String filePath, long timePartitionId) throws 
IOException {
-    TsFileProcessor tsFileProcessor;
-    if (sequence) {
-      tsFileProcessor =
-          new TsFileProcessor(
-              databaseName + FILE_NAME_SEPARATOR + dataRegionId,
-              fsFactory.getFileWithParent(filePath),
-              dataRegionInfo,
-              this::closeUnsealedTsFileProcessorCallBack,
-              this::sequenceFlushCallback,
-              true);
-    } else {
-      tsFileProcessor =
-          new TsFileProcessor(
-              databaseName + FILE_NAME_SEPARATOR + dataRegionId,
-              fsFactory.getFileWithParent(filePath),
-              dataRegionInfo,
-              this::closeUnsealedTsFileProcessorCallBack,
-              this::unsequenceFlushCallback,
-              false);
-    }
+    TsFileProcessor tsFileProcessor =
+        new TsFileProcessor(
+            databaseName + FILE_NAME_SEPARATOR + dataRegionId,
+            fsFactory.getFileWithParent(filePath),
+            dataRegionInfo,
+            this::closeUnsealedTsFileProcessorCallBack,
+            this::flushCallback,
+            sequence);
 
     if (enableMemControl) {
       TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(dataRegionInfo);
@@ -2338,26 +2317,7 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  private void unsequenceFlushCallback(
-      TsFileProcessor processor, Map<String, Long> updateMap, long 
systemFlushTime) {
-    if (!config.isEnableSeparateData()
-        && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      // update globalLastFlushTime if and only if isEnableSeparateData is 
false and
-      // isLastCacheEnable is true
-      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(updateMap);
-    }
-    if (config.isEnableSeparateData()) {
-      TimePartitionManager.getInstance()
-          .updateAfterFlushing(
-              new DataRegionId(Integer.valueOf(dataRegionId)),
-              processor.getTimeRangeId(),
-              systemFlushTime,
-              lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
-              workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != 
null);
-    }
-  }
-
-  private void sequenceFlushCallback(
+  private void flushCallback(
       TsFileProcessor processor, Map<String, Long> updateMap, long 
systemFlushTime) {
     if (config.isEnableSeparateData()
         && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
index b7d4766a4a8..6e99cf77fad 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
@@ -129,6 +130,42 @@ public class LastFlushTimeMapTest {
         10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0, 
"root.vehicle.d1"));
   }
 
+  @Test
+  public void testRecoverLastFlushTimeMap()
+      throws IOException, IllegalPathException, WriteProcessException, 
DataRegionException {
+    TSRecord record = new TSRecord(604_800_000, "root.vehicle.d0");
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(1000)));
+    dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    record = new TSRecord(604_799_999, "root.vehicle.d0");
+    record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(1000)));
+    dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    for (int j = 1; j <= 10; j++) {
+      record = new TSRecord(j, "root.vehicle.d0");
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+    }
+
+    for (TsFileProcessor tsfileProcessor : 
dataRegion.getWorkUnsequenceTsFileProcessors()) {
+      tsfileProcessor.syncFlush();
+    }
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+    Assert.assertEquals(
+        604_800_000, dataRegion.getLastFlushTimeMap().getFlushedTime(1, 
"root.vehicle.d0"));
+    Assert.assertEquals(
+        604_799_999, dataRegion.getLastFlushTimeMap().getFlushedTime(0, 
"root.vehicle.d0"));
+
+    // recover from disk
+    dataRegion = new DataRegionTest.DummyDataRegion(systemDir, storageGroup);
+    Assert.assertEquals(
+        604_800_000, dataRegion.getLastFlushTimeMap().getFlushedTime(1, 
"root.vehicle.d0"));
+    Assert.assertEquals(
+        604_799_999, dataRegion.getLastFlushTimeMap().getFlushedTime(0, 
"root.vehicle.d0"));
+  }
+
   @Test
   public void testLastFlushedTimeWhenLargestTimestampInUnSeqSpace()
       throws IllegalPathException, WriteProcessException {

Reply via email to