This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_last_flush_time_recover_bug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 253145a783ac26537239507e23e0d65174860273 Author: HTHou <[email protected]> AuthorDate: Mon Jan 29 10:36:53 2024 +0800 Fix non-latest partition last flush time cannot recover --- .../db/storageengine/dataregion/DataRegion.java | 68 +++++----------------- 1 file changed, 14 insertions(+), 54 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 5bda2c80bf3..73a24f63c13 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -524,20 +524,12 @@ public class DataRegion implements IDataRegionForQuery { } for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) { recoverFilesInPartition( - partitionFiles.getKey(), - dataRegionRecoveryContext, - partitionFiles.getValue(), - true, - partitionFiles.getKey() == latestPartitionId); + partitionFiles.getKey(), dataRegionRecoveryContext, partitionFiles.getValue(), true); } for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpUnseqTsFiles.entrySet()) { recoverFilesInPartition( - partitionFiles.getKey(), - dataRegionRecoveryContext, - partitionFiles.getValue(), - false, - partitionFiles.getKey() == latestPartitionId); + partitionFiles.getKey(), dataRegionRecoveryContext, partitionFiles.getValue(), false); } if (config.isEnableSeparateData()) { TimePartitionManager.getInstance() @@ -763,7 +755,7 @@ public class DataRegion implements IDataRegionForQuery { dataRegionInfo, tsFileResource, this::closeUnsealedTsFileProcessorCallBack, - isSeq ? this::sequenceFlushCallback : this::unsequenceFlushCallback, + this::flushCallback, isSeq, writer); if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null @@ -825,12 +817,11 @@ public class DataRegion implements IDataRegionForQuery { long partitionId, DataRegionRecoveryContext context, List<TsFileResource> resourceList, - boolean isSeq, - boolean isLatestPartition) { + boolean isSeq) { for (TsFileResource tsFileResource : resourceList) { recoverSealedTsFiles(tsFileResource, context, isSeq); } - if (isLatestPartition && config.isEnableSeparateData()) { + if (config.isEnableSeparateData()) { lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId); for (TsFileResource tsFileResource : resourceList) { updateLastFlushTime(tsFileResource, isSeq); @@ -1405,26 +1396,14 @@ public class DataRegion implements IDataRegionForQuery { private TsFileProcessor getTsFileProcessor( boolean sequence, String filePath, long timePartitionId) throws IOException { - TsFileProcessor tsFileProcessor; - if (sequence) { - tsFileProcessor = - new TsFileProcessor( - databaseName + FILE_NAME_SEPARATOR + dataRegionId, - fsFactory.getFileWithParent(filePath), - dataRegionInfo, - this::closeUnsealedTsFileProcessorCallBack, - this::sequenceFlushCallback, - true); - } else { - tsFileProcessor = - new TsFileProcessor( - databaseName + FILE_NAME_SEPARATOR + dataRegionId, - fsFactory.getFileWithParent(filePath), - dataRegionInfo, - this::closeUnsealedTsFileProcessorCallBack, - this::unsequenceFlushCallback, - false); - } + TsFileProcessor tsFileProcessor = + new TsFileProcessor( + databaseName + FILE_NAME_SEPARATOR + dataRegionId, + fsFactory.getFileWithParent(filePath), + dataRegionInfo, + this::closeUnsealedTsFileProcessorCallBack, + this::flushCallback, + sequence); if (enableMemControl) { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo); @@ -2338,26 +2317,7 @@ public class DataRegion implements IDataRegionForQuery { } } - private void unsequenceFlushCallback( - TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) { - if (!config.isEnableSeparateData() - && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { - // update globalLastFlushTime if and only if isEnableSeparateData is false and - // isLastCacheEnable is true - lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(updateMap); - } - if (config.isEnableSeparateData()) { - TimePartitionManager.getInstance() - .updateAfterFlushing( - new DataRegionId(Integer.valueOf(dataRegionId)), - processor.getTimeRangeId(), - systemFlushTime, - lastFlushTimeMap.getMemSize(processor.getTimeRangeId()), - workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != null); - } - } - - private void sequenceFlushCallback( + private void flushCallback( TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) { if (config.isEnableSeparateData() && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
