This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e331eadbeaf Fix non-latest partition last flush time cannot recover
(#11999)
e331eadbeaf is described below
commit e331eadbeaf4281eb493c54fc9f4e1d5929caa2c
Author: Haonan <[email protected]>
AuthorDate: Mon Jan 29 12:33:39 2024 +0800
Fix non-latest partition last flush time cannot recover (#11999)
---
.../db/storageengine/dataregion/DataRegion.java | 68 +++++-----------------
.../dataregion/LastFlushTimeMapTest.java | 37 ++++++++++++
2 files changed, 51 insertions(+), 54 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 5bda2c80bf3..73a24f63c13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -524,20 +524,12 @@ public class DataRegion implements IDataRegionForQuery {
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpSeqTsFiles.entrySet()) {
recoverFilesInPartition(
- partitionFiles.getKey(),
- dataRegionRecoveryContext,
- partitionFiles.getValue(),
- true,
- partitionFiles.getKey() == latestPartitionId);
+ partitionFiles.getKey(), dataRegionRecoveryContext,
partitionFiles.getValue(), true);
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpUnseqTsFiles.entrySet()) {
recoverFilesInPartition(
- partitionFiles.getKey(),
- dataRegionRecoveryContext,
- partitionFiles.getValue(),
- false,
- partitionFiles.getKey() == latestPartitionId);
+ partitionFiles.getKey(), dataRegionRecoveryContext,
partitionFiles.getValue(), false);
}
if (config.isEnableSeparateData()) {
TimePartitionManager.getInstance()
@@ -763,7 +755,7 @@ public class DataRegion implements IDataRegionForQuery {
dataRegionInfo,
tsFileResource,
this::closeUnsealedTsFileProcessorCallBack,
- isSeq ? this::sequenceFlushCallback :
this::unsequenceFlushCallback,
+ this::flushCallback,
isSeq,
writer);
if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId())
== null
@@ -825,12 +817,11 @@ public class DataRegion implements IDataRegionForQuery {
long partitionId,
DataRegionRecoveryContext context,
List<TsFileResource> resourceList,
- boolean isSeq,
- boolean isLatestPartition) {
+ boolean isSeq) {
for (TsFileResource tsFileResource : resourceList) {
recoverSealedTsFiles(tsFileResource, context, isSeq);
}
- if (isLatestPartition && config.isEnableSeparateData()) {
+ if (config.isEnableSeparateData()) {
lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId);
for (TsFileResource tsFileResource : resourceList) {
updateLastFlushTime(tsFileResource, isSeq);
@@ -1405,26 +1396,14 @@ public class DataRegion implements IDataRegionForQuery {
private TsFileProcessor getTsFileProcessor(
boolean sequence, String filePath, long timePartitionId) throws
IOException {
- TsFileProcessor tsFileProcessor;
- if (sequence) {
- tsFileProcessor =
- new TsFileProcessor(
- databaseName + FILE_NAME_SEPARATOR + dataRegionId,
- fsFactory.getFileWithParent(filePath),
- dataRegionInfo,
- this::closeUnsealedTsFileProcessorCallBack,
- this::sequenceFlushCallback,
- true);
- } else {
- tsFileProcessor =
- new TsFileProcessor(
- databaseName + FILE_NAME_SEPARATOR + dataRegionId,
- fsFactory.getFileWithParent(filePath),
- dataRegionInfo,
- this::closeUnsealedTsFileProcessorCallBack,
- this::unsequenceFlushCallback,
- false);
- }
+ TsFileProcessor tsFileProcessor =
+ new TsFileProcessor(
+ databaseName + FILE_NAME_SEPARATOR + dataRegionId,
+ fsFactory.getFileWithParent(filePath),
+ dataRegionInfo,
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::flushCallback,
+ sequence);
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(dataRegionInfo);
@@ -2338,26 +2317,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private void unsequenceFlushCallback(
- TsFileProcessor processor, Map<String, Long> updateMap, long
systemFlushTime) {
- if (!config.isEnableSeparateData()
- && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- // update globalLastFlushTime if and only if isEnableSeparateData is
false and
- // isLastCacheEnable is true
- lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(updateMap);
- }
- if (config.isEnableSeparateData()) {
- TimePartitionManager.getInstance()
- .updateAfterFlushing(
- new DataRegionId(Integer.valueOf(dataRegionId)),
- processor.getTimeRangeId(),
- systemFlushTime,
- lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
- workSequenceTsFileProcessors.get(processor.getTimeRangeId()) !=
null);
- }
- }
-
- private void sequenceFlushCallback(
+ private void flushCallback(
TsFileProcessor processor, Map<String, Long> updateMap, long
systemFlushTime) {
if (config.isEnableSeparateData()
&& CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
index b7d4766a4a8..6e99cf77fad 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
@@ -129,6 +130,42 @@ public class LastFlushTimeMapTest {
10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d1"));
}
+ @Test
+ public void testRecoverLastFlushTimeMap()
+ throws IOException, IllegalPathException, WriteProcessException,
DataRegionException {
+ TSRecord record = new TSRecord(604_800_000, "root.vehicle.d0");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1000)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ record = new TSRecord(604_799_999, "root.vehicle.d0");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1000)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (int j = 1; j <= 10; j++) {
+ record = new TSRecord(j, "root.vehicle.d0");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ }
+
+ for (TsFileProcessor tsfileProcessor :
dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.syncFlush();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertEquals(
+ 604_800_000, dataRegion.getLastFlushTimeMap().getFlushedTime(1,
"root.vehicle.d0"));
+ Assert.assertEquals(
+ 604_799_999, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
+
+ // recover from disk
+ dataRegion = new DataRegionTest.DummyDataRegion(systemDir, storageGroup);
+ Assert.assertEquals(
+ 604_800_000, dataRegion.getLastFlushTimeMap().getFlushedTime(1,
"root.vehicle.d0"));
+ Assert.assertEquals(
+ 604_799_999, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
+ }
+
@Test
public void testLastFlushedTimeWhenLargestTimestampInUnSeqSpace()
throws IllegalPathException, WriteProcessException {