This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch AccelerateRestart in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1173629c183940dcab405077bff3bb74bad652fb Author: JackieTien97 <[email protected]> AuthorDate: Thu Sep 29 11:36:12 2022 +0800 Accelerate restart proces --- .../iotdb/db/engine/storagegroup/DataRegion.java | 33 +++++++++++++--------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 37395e1207..2c50eee338 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -535,19 +535,6 @@ public class DataRegion { throw new DataRegionException(e); } - List<TsFileResource> seqTsFileResources = tsFileManager.getTsFileList(true); - for (TsFileResource resource : seqTsFileResources) { - long timePartitionId = resource.getTimePartition(); - Map<String, Long> endTimeMap = new HashMap<>(); - for (String deviceId : resource.getDevices()) { - long endTime = resource.getEndTime(deviceId); - endTimeMap.put(deviceId.intern(), endTime); - } - lastFlushTimeManager.setMultiDeviceLastTime(timePartitionId, endTimeMap); - lastFlushTimeManager.setMultiDeviceFlushedTime(timePartitionId, endTimeMap); - lastFlushTimeManager.setMultiDeviceGlobalFlushedTime(endTimeMap); - } - // recover and start timed compaction thread initCompaction(); @@ -561,6 +548,21 @@ public class DataRegion { } } + private void updateLastFlushTime(TsFileResource resource, boolean isSeq) { + // only update flush time when it is a seq file + if (isSeq) { + long timePartitionId = resource.getTimePartition(); + Map<String, Long> endTimeMap = new HashMap<>(); + for (String deviceId : resource.getDevices()) { + long endTime = resource.getEndTime(deviceId); + endTimeMap.put(deviceId.intern(), endTime); + } + lastFlushTimeManager.setMultiDeviceLastTime(timePartitionId, endTimeMap); + lastFlushTimeManager.setMultiDeviceFlushedTime(timePartitionId, endTimeMap); + lastFlushTimeManager.setMultiDeviceGlobalFlushedTime(endTimeMap); + } + } + private void initCompaction() { if (!config.isEnableSeqSpaceCompaction() && !config.isEnableUnseqSpaceCompaction() @@ -740,6 +742,7 @@ public class DataRegion { private void callbackAfterUnsealedTsFileRecovered( UnsealedTsFileRecoverPerformer recoverPerformer) { TsFileResource tsFileResource = recoverPerformer.getTsFileResource(); + boolean isSeq = recoverPerformer.isSequence(); if (!recoverPerformer.canWrite()) { // cannot write, just close it for (ISyncManager syncManager : @@ -751,6 +754,7 @@ public class DataRegion { } catch (IOException e) { logger.error("Fail to close TsFile {} when recovering", tsFileResource.getTsFile(), e); } + updateLastFlushTime(tsFileResource, isSeq); tsFileResourceManager.registerSealedTsFileResource(tsFileResource); TsFileMetricManager.getInstance() .addFile(tsFileResource.getTsFile().length(), recoverPerformer.isSequence()); @@ -758,7 +762,6 @@ public class DataRegion { // the last file is not closed, continue writing to it RestorableTsFileIOWriter writer = recoverPerformer.getWriter(); long timePartitionId = tsFileResource.getTimePartition(); - boolean isSeq = recoverPerformer.isSequence(); TsFileProcessor tsFileProcessor = new TsFileProcessor( dataRegionId, @@ -792,6 +795,7 @@ public class DataRegion { } tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize); } + updateLastFlushTime(tsFileResource, isSeq); } tsFileManager.add(tsFileResource, recoverPerformer.isSequence()); } @@ -815,6 +819,7 @@ public class DataRegion { } sealedTsFile.close(); tsFileManager.add(sealedTsFile, isSeq); + updateLastFlushTime(sealedTsFile, isSeq); tsFileResourceManager.registerSealedTsFileResource(sealedTsFile); } catch (DataRegionException | IOException e) { logger.error("Fail to recover sealed TsFile {}, skip it.", sealedTsFile.getTsFilePath(), e);
