This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch cp_speedup_recover in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de66e2602565642b06837b3691ec02c345e3363d Author: Haonan <[email protected]> AuthorDate: Thu Aug 29 10:31:25 2024 +0800 Speed up recover (#13068) * init * init * init * dev * dev * dev * dev reader * support read and write FileTimeIndexCache * support read and write FileTimeIndexCache * dev recover progress * update last flush time after async recover finished * fix package structure * finish compact logic * fix UT * Fix repair data error * adapt pipe * try to fix 1c3d IT * fixing recover from wal not recode fileTimeIndex * recover from wal need to recode file timeindex cache * fix sonar bug * update more * fix empty FileTimeIndexCache * batch serialize * control the thread number of recover * fix cannot start recover task * fix delete region * fix compile issue * fixing review * fix review * fix review and small issue * fix review and small issue --- .../iotdb/db/it/IoTDBPartialInsertionIT.java | 2 +- .../PipeHistoricalDataRegionTsFileExtractor.java | 3 + .../impl/DataNodeInternalRPCServiceImpl.java | 2 +- .../config/executor/ClusterConfigTaskExecutor.java | 2 +- .../java/org/apache/iotdb/db/service/DataNode.java | 40 ++- .../iotdb/db/storageengine/StorageEngine.java | 100 +++++-- .../db/storageengine/dataregion/DataRegion.java | 331 +++++++++++++-------- .../dataregion/DeviceLastFlushTime.java | 4 + .../dataregion/HashLastFlushTimeMap.java | 92 ++++-- .../dataregion/ILastFlushTimeMap.java | 11 +- .../schedule/CompactionScheduleTaskWorker.java | 2 +- .../compaction/schedule/TTLScheduleTask.java | 2 +- .../dataregion/memtable/TsFileProcessor.java | 2 + .../storageengine/dataregion/tsfile/TsFileID.java | 23 ++ .../dataregion/tsfile/TsFileManager.java | 27 +- .../dataregion/tsfile/TsFileResource.java | 15 + .../timeindex/FileTimeIndexCacheRecorder.java | 227 ++++++++++++++ .../FileTimeIndexCacheReader.java | 84 ++++++ .../FileTimeIndexCacheWriter.java | 103 +++++++ .../file/UnsealedTsFileRecoverPerformer.java | 2 + .../dataregion/LastFlushTimeMapTest.java | 6 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 2 + .../iotdb/commons/concurrent/ThreadName.java | 2 + 23 files changed, 895 insertions(+), 189 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java index 074da6873ee..182167be41c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java @@ -108,7 +108,7 @@ public class IoTDBPartialInsertionIT { EnvironmentUtils.restartDaemon(); StorageEngine.getInstance().recover(); // wait for recover - while (!StorageEngine.getInstance().isAllSgReady()) { + while (!StorageEngine.getInstance().isReadyForReadAndWrite()) { Thread.sleep(500); time += 500; if (time > 10000) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index ca1888b0200..ce569ed2279 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -369,6 +369,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa @Override public synchronized void start() { + if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { + return; + } if (!shouldExtractInsertion) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 3604f0c00dd..c7f076a476e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1783,7 +1783,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus startRepairData() throws TException { - if (!storageEngine.isAllSgReady()) { + if (!storageEngine.isReadyForNonReadWriteFunctions()) { return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all sg is ready"); } IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 7ac6b553e5e..f758c1a9252 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -1105,7 +1105,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { future.setException(e); } } else { - if (!StorageEngine.getInstance().isAllSgReady()) { + if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { future.setException( new IoTDBException( "not all sg is ready", TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 31df10be7c5..e1f654391d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -246,6 +246,25 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { logger.info("IoTDB configuration: {}", config.getConfigMessage()); logger.info("Congratulations, IoTDB DataNode is set up successfully. Now, enjoy yourself!"); + if (isUsingPipeConsensus()) { + long dataRegionStartTime = System.currentTimeMillis(); + while (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { + try { + TimeUnit.MILLISECONDS.sleep(1000); + } catch (InterruptedException e) { + logger.warn("IoTDB DataNode failed to set up.", e); + Thread.currentThread().interrupt(); + return; + } + } + DataRegionConsensusImpl.getInstance().start(); + long dataRegionEndTime = System.currentTimeMillis(); + logger.info( + "DataRegion consensus start successfully, which takes {} ms.", + (dataRegionEndTime - dataRegionStartTime)); + dataRegionConsensusStarted = true; + } + } catch (StartupException | IOException e) { logger.error("Fail to start server", e); stop(); @@ -664,12 +683,14 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { "SchemaRegion consensus start successfully, which takes {} ms.", (schemaRegionEndTime - startTime)); schemaRegionConsensusStarted = true; - DataRegionConsensusImpl.getInstance().start(); - long dataRegionEndTime = System.currentTimeMillis(); - logger.info( - "DataRegion consensus start successfully, which takes {} ms.", - (dataRegionEndTime - schemaRegionEndTime)); - dataRegionConsensusStarted = true; + if (!isUsingPipeConsensus()) { + DataRegionConsensusImpl.getInstance().start(); + long dataRegionEndTime = System.currentTimeMillis(); + logger.info( + "DataRegion consensus start successfully, which takes {} ms.", + (dataRegionEndTime - schemaRegionEndTime)); + dataRegionConsensusStarted = true; + } } catch (IOException e) { throw new StartupException(e); } @@ -718,7 +739,7 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { logger.info( "IoTDB DataNode is setting up, some databases may not be ready now, please wait several seconds..."); long startTime = System.currentTimeMillis(); - while (!StorageEngine.getInstance().isAllSgReady()) { + while (!StorageEngine.getInstance().isReadyForReadAndWrite()) { try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { @@ -814,6 +835,11 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { return new TDataNodeConfiguration(location, resource); } + private boolean isUsingPipeConsensus() { + return config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2) + || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS); + } + private void registerUdfServices() throws StartupException { registerManager.register(TemporaryQueryDataFileService.getInstance()); registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 1e4f2953fe9..224a20dc196 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -120,7 +120,7 @@ public class StorageEngine implements IService { * a folder (system/databases/ by default) that persist system info. Each database will have a * subfolder under the systemDir. */ - private final String systemDir = + private static final String systemDir = FilePathUtils.regularizePath(CONFIG.getSystemDir()) + "databases"; /** DataRegionId -> DataRegion */ @@ -134,19 +134,21 @@ public class StorageEngine implements IService { /** number of ready data region */ private AtomicInteger readyDataRegionNum; - private AtomicBoolean isAllSgReady = new AtomicBoolean(false); + private final AtomicBoolean isReadyForReadAndWrite = new AtomicBoolean(); + + private final AtomicBoolean isReadyForNonReadWriteFunctions = new AtomicBoolean(); private ScheduledExecutorService seqMemtableTimedFlushCheckThread; private ScheduledExecutorService unseqMemtableTimedFlushCheckThread; - private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy(); + private final TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy(); /** used to do short-lived asynchronous tasks */ private ExecutorService cachedThreadPool; // add customized listeners here for flush and close events - private List<CloseFileListener> customCloseFileListeners = new ArrayList<>(); - private List<FlushListener> customFlushListeners = new ArrayList<>(); + private final List<CloseFileListener> customCloseFileListeners = new ArrayList<>(); + private final List<FlushListener> customFlushListeners = new ArrayList<>(); private int recoverDataRegionNum = 0; private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager(); @@ -178,17 +180,19 @@ public class StorageEngine implements IService { } } - public boolean isAllSgReady() { - return isAllSgReady.get(); + public boolean isReadyForReadAndWrite() { + return isReadyForReadAndWrite.get(); } - public void setAllSgReady(boolean allSgReady) { - isAllSgReady.set(allSgReady); + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public boolean isReadyForNonReadWriteFunctions() { + return isReadyForNonReadWriteFunctions.get(); } - public void asyncRecover() throws StartupException { + private void asyncRecoverDataRegion() throws StartupException { long startRecoverTime = System.currentTimeMillis(); - setAllSgReady(false); + isReadyForNonReadWriteFunctions.set(false); + isReadyForReadAndWrite.set(false); cachedThreadPool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName()); @@ -209,8 +213,7 @@ public class StorageEngine implements IService { new Thread( () -> { checkResults(futures, "StorageEngine failed to recover."); - recoverRepairData(); - setAllSgReady(true); + isReadyForReadAndWrite.set(true); LOGGER.info( "Storage Engine recover cost: {}s.", (System.currentTimeMillis() - startRecoverTime) / 1000); @@ -287,11 +290,22 @@ public class StorageEngine implements IService { throw new StorageEngineFailureException(e); } - asyncRecover(); - - LOGGER.info("start ttl check thread successfully."); + asyncRecoverDataRegion(); startTimedService(); + + // wait here for dataRegionMap recovered + while (!isReadyForReadAndWrite.get()) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + LOGGER.warn("Storage engine failed to set up.", e); + Thread.currentThread().interrupt(); + return; + } + } + + asyncRecoverTsFileResource(); } private void startTimedService() { @@ -339,6 +353,36 @@ public class StorageEngine implements IService { } } + private void asyncRecoverTsFileResource() { + List<Future<Void>> futures = new LinkedList<>(); + for (DataRegion dataRegion : dataRegionMap.values()) { + if (dataRegion != null) { + List<Callable<Void>> asyncTsFileResourceRecoverTasks = + dataRegion.getAsyncTsFileResourceRecoverTaskList(); + if (asyncTsFileResourceRecoverTasks != null) { + Callable<Void> taskOfRegion = + () -> { + for (Callable<Void> task : asyncTsFileResourceRecoverTasks) { + task.call(); + } + dataRegion.initCompactionSchedule(); + return null; + }; + futures.add(cachedThreadPool.submit(taskOfRegion)); + } + } + } + Thread recoverEndTrigger = + new Thread( + () -> { + checkResults(futures, "async recover tsfile resource meets error."); + recoverRepairData(); + isReadyForNonReadWriteFunctions.set(true); + }, + ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName()); + recoverEndTrigger.start(); + } + @Override public void stop() { for (DataRegion dataRegion : dataRegionMap.values()) { @@ -649,8 +693,6 @@ public class StorageEngine implements IService { /** * Add a listener to listen flush start/end events. Notice that this addition only applies to * TsFileProcessors created afterwards. - * - * @param listener */ public void registerFlushListener(FlushListener listener) { customFlushListeners.add(listener); @@ -659,8 +701,6 @@ public class StorageEngine implements IService { /** * Add a listener to listen file close events. Notice that this addition only applies to * TsFileProcessors created afterwards. - * - * @param listener */ public void registerCloseFileListener(CloseFileListener listener) { customCloseFileListeners.add(listener); @@ -676,7 +716,7 @@ public class StorageEngine implements IService { } // When registering a new region, the coordinator needs to register the corresponding region with - // the local storageengine before adding the corresponding consensusGroup to the consensus layer + // the local storage before adding the corresponding consensusGroup to the consensus layer public DataRegion createDataRegion(DataRegionId regionId, String sg) throws DataRegionException { makeSureNoOldRegion(regionId); AtomicReference<DataRegionException> exceptionAtomicReference = new AtomicReference<>(null); @@ -953,6 +993,24 @@ public class StorageEngine implements IService { }); } + public static File getDataRegionSystemDir(String dataBaseName, String dataRegionId) { + return SystemFileFactory.INSTANCE.getFile( + systemDir + File.separator + dataBaseName, dataRegionId); + } + + public Runnable executeCompactFileTimeIndexCache() { + return () -> { + if (!isReadyForNonReadWriteFunctions()) { + return; + } + for (DataRegion dataRegion : dataRegionMap.values()) { + if (dataRegion != null) { + dataRegion.compactFileTimeIndexCache(); + } + } + }; + } + static class InstanceHolder { private static final StorageEngine INSTANCE = new StorageEngine(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index e30af047e2d..232e1dd09e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -89,11 +89,15 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.ClosedFileScanHandleImpl; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.VersionController; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; +import org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheReader; import org.apache.iotdb.db.storageengine.dataregion.utils.validate.TsFileValidator; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; @@ -148,6 +152,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -243,8 +248,8 @@ public class DataRegion implements IDataRegionForQuery { /** database name. */ private final String databaseName; - /** database system directory. */ - private File storageGroupSysDir; + /** data region system directory. */ + private File dataRegionSysDir; /** manage seqFileList and unSeqFileList. */ private final TsFileManager tsFileManager; @@ -280,6 +285,8 @@ public class DataRegion implements IDataRegionForQuery { /** whether it's ready from recovery. */ private boolean isReady = false; + private List<Callable<Void>> asyncTsFileResourceRecoverTaskList; + /** close file listeners. */ private List<CloseFileListener> customCloseFileListeners = Collections.emptyList(); @@ -320,21 +327,20 @@ public class DataRegion implements IDataRegionForQuery { this.fileFlushPolicy = fileFlushPolicy; acquireDirectBufferMemory(); - storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); - this.tsFileManager = - new TsFileManager(databaseName, dataRegionId, storageGroupSysDir.getPath()); - if (storageGroupSysDir.mkdirs()) { + dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); + this.tsFileManager = new TsFileManager(databaseName, dataRegionId, dataRegionSysDir.getPath()); + if (dataRegionSysDir.mkdirs()) { logger.info( - "Database system Directory {} doesn't exist, create it", storageGroupSysDir.getPath()); - } else if (!storageGroupSysDir.exists()) { - logger.error("create database system Directory {} failed", storageGroupSysDir.getPath()); + "Database system Directory {} doesn't exist, create it", dataRegionSysDir.getPath()); + } else if (!dataRegionSysDir.exists()) { + logger.error("create database system Directory {} failed", dataRegionSysDir.getPath()); } lastFlushTimeMap = new HashLastFlushTimeMap(); - // recover tsfiles unless consensus protocol is ratis and storage storageengine is not ready + // recover tsfiles unless consensus protocol is ratis and storage engine is not ready if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS) - && !StorageEngine.getInstance().isAllSgReady()) { + && !StorageEngine.getInstance().isReadyForReadAndWrite()) { logger.debug( "Skip recovering data region {}[{}] when consensus protocol is ratis and storage engine is not ready.", databaseName, @@ -356,6 +362,7 @@ public class DataRegion implements IDataRegionForQuery { } } } else { + asyncTsFileResourceRecoverTaskList = new ArrayList<>(); recover(); } @@ -380,17 +387,8 @@ public class DataRegion implements IDataRegionForQuery { return isReady; } - public void setReady(boolean ready) { - isReady = ready; - } - - private Map<Long, List<TsFileResource>> splitResourcesByPartition( - List<TsFileResource> resources) { - Map<Long, List<TsFileResource>> ret = new TreeMap<>(); - for (TsFileResource resource : resources) { - ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource); - } - return ret; + public List<Callable<Void>> getAsyncTsFileResourceRecoverTaskList() { + return asyncTsFileResourceRecoverTaskList; } /** this class is used to store recovering context. */ @@ -451,19 +449,16 @@ public class DataRegion implements IDataRegionForQuery { try { // collect candidate TsFiles from sequential and unsequential data directory - List<TsFileResource> tmpSeqTsFiles = - getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders()); - List<TsFileResource> tmpUnseqTsFiles = - getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders()); - // split by partition so that we can find the last file of each partition and decide to // close it or not - DataRegionRecoveryContext dataRegionRecoveryContext = - new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() + tmpUnseqTsFiles.size()); Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = - splitResourcesByPartition(tmpSeqTsFiles); + getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders()); Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = - splitResourcesByPartition(tmpUnseqTsFiles); + getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders()); + DataRegionRecoveryContext dataRegionRecoveryContext = + new DataRegionRecoveryContext( + partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum() + + partitionTmpUnseqTsFiles.values().stream().mapToLong(List::size).sum()); // submit unsealed TsFiles to recover List<WALRecoverListener> recoverListeners = new ArrayList<>(); for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) { @@ -544,13 +539,27 @@ public class DataRegion implements IDataRegionForQuery { ((TreeMap<Long, List<TsFileResource>>) partitionTmpUnseqTsFiles).lastKey()); } for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) { - recoverFilesInPartition( - partitionFiles.getKey(), dataRegionRecoveryContext, partitionFiles.getValue(), true); + Callable<Void> asyncRecoverTask = + recoverFilesInPartition( + partitionFiles.getKey(), + dataRegionRecoveryContext, + partitionFiles.getValue(), + true); + if (asyncRecoverTask != null) { + asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); + } } for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpUnseqTsFiles.entrySet()) { - recoverFilesInPartition( - partitionFiles.getKey(), dataRegionRecoveryContext, partitionFiles.getValue(), false); + Callable<Void> asyncRecoverTask = + recoverFilesInPartition( + partitionFiles.getKey(), + dataRegionRecoveryContext, + partitionFiles.getValue(), + false); + if (asyncRecoverTask != null) { + asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); + } } if (config.isEnableSeparateData()) { TimePartitionManager.getInstance() @@ -599,16 +608,25 @@ public class DataRegion implements IDataRegionForQuery { throw new DataRegionException(e); } - initCompactionSchedule(); + if (asyncTsFileResourceRecoverTaskList.isEmpty()) { + initCompactionSchedule(); + } - if (StorageEngine.getInstance().isAllSgReady()) { + if (StorageEngine.getInstance().isReadyForReadAndWrite()) { logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId); } else { logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId); } } - private void updateLastFlushTime(TsFileResource resource, boolean isSeq) { + private void updatePartitionLastFlushTime(TsFileResource resource) { + if (config.isEnableSeparateData()) { + lastFlushTimeMap.updatePartitionFlushedTime( + resource.getTimePartition(), resource.getTimeIndex().getMaxEndTime()); + } + } + + protected void updateDeviceLastFlushTime(TsFileResource resource) { long timePartitionId = resource.getTimePartition(); Map<IDeviceID, Long> endTimeMap = new HashMap<>(); for (IDeviceID deviceId : resource.getDevices()) { @@ -623,6 +641,23 @@ public class DataRegion implements IDataRegionForQuery { } } + protected void upgradeAndUpdateDeviceLastFlushTime( + long timePartitionId, List<TsFileResource> resources) { + Map<IDeviceID, Long> endTimeMap = new HashMap<>(); + for (TsFileResource resource : resources) { + for (IDeviceID deviceId : resource.getDevices()) { + long endTime = resource.getEndTime(deviceId); + endTimeMap.put(deviceId, endTime); + } + } + if (config.isEnableSeparateData()) { + lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId, endTimeMap); + } + if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { + lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap); + } + } + public void initCompactionSchedule() { if (!config.isEnableSeqSpaceCompaction() && !config.isEnableUnseqSpaceCompaction() @@ -646,7 +681,7 @@ public class DataRegion implements IDataRegionForQuery { } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private List<TsFileResource> getAllFiles(List<String> folders) + private Map<Long, List<TsFileResource>> getAllFiles(List<String> folders) throws IOException, DataRegionException { // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition Map<String, File> tsFilePartitionPath2File = new HashMap<>(); @@ -684,10 +719,12 @@ public class DataRegion implements IDataRegionForQuery { sortedFiles.sort(this::compareFileName); long currentTime = System.currentTimeMillis(); - List<TsFileResource> ret = new ArrayList<>(); + Map<Long, List<TsFileResource>> ret = new TreeMap<>(); for (File f : sortedFiles) { checkTsFileTime(f, currentTime); - ret.add(new TsFileResource(f)); + TsFileResource resource = new TsFileResource(f); + ret.computeIfAbsent(resource.getTsFileID().timePartitionId, l -> new ArrayList<>()) + .add(resource); } return ret; } @@ -743,7 +780,7 @@ public class DataRegion implements IDataRegionForQuery { tsFileResource.remove(); return; } - updateLastFlushTime(tsFileResource, isSeq); + updateDeviceLastFlushTime(tsFileResource); tsFileResourceManager.registerSealedTsFileResource(tsFileResource); FileMetrics.getInstance() .addTsFile( @@ -821,7 +858,102 @@ public class DataRegion implements IDataRegionForQuery { } } - private void recoverFilesInPartition( + private Callable<Void> recoverFilesInPartition( + long partitionId, + DataRegionRecoveryContext context, + List<TsFileResource> resourceList, + boolean isSeq) { + + File partitionSysDir = + SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); + File logFile = SystemFileFactory.INSTANCE.getFile(partitionSysDir, "FileTimeIndexCache_0"); + if (logFile.exists()) { + Map<TsFileID, FileTimeIndex> fileTimeIndexMap; + try { + FileTimeIndexCacheReader logReader = + new FileTimeIndexCacheReader(logFile, dataRegionId, partitionId); + fileTimeIndexMap = logReader.read(); + } catch (Exception e) { + throw new RuntimeException(e); + } + List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>(); + List<TsFileResource> resourceListForSyncRecover = new ArrayList<>(); + Callable<Void> asyncRecoverTask = null; + for (TsFileResource tsFileResource : resourceList) { + if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) { + tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID())); + tsFileResource.setStatus(TsFileResourceStatus.NORMAL); + tsFileManager.add(tsFileResource, isSeq); + resourceListForAsyncRecover.add(tsFileResource); + } else { + resourceListForSyncRecover.add(tsFileResource); + } + } + if (!resourceListForAsyncRecover.isEmpty()) { + asyncRecoverTask = + asyncRecoverFilesInPartition(partitionId, context, resourceListForAsyncRecover, isSeq); + } + if (!resourceListForSyncRecover.isEmpty()) { + syncRecoverFilesInPartition(partitionId, context, resourceListForSyncRecover, isSeq); + } + return asyncRecoverTask; + } else { + syncRecoverFilesInPartition(partitionId, context, resourceList, isSeq); + return null; + } + } + + private Callable<Void> asyncRecoverFilesInPartition( + long partitionId, + DataRegionRecoveryContext context, + List<TsFileResource> resourceList, + boolean isSeq) { + if (config.isEnableSeparateData()) { + if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId, false)) { + TimePartitionManager.getInstance() + .registerTimePartitionInfo( + new TimePartitionInfo( + new DataRegionId(Integer.parseInt(dataRegionId)), + partitionId, + false, + Long.MAX_VALUE, + lastFlushTimeMap.getMemSize(partitionId))); + } + for (TsFileResource tsFileResource : resourceList) { + updatePartitionLastFlushTime(tsFileResource); + } + TimePartitionManager.getInstance() + .updateAfterFlushing( + new DataRegionId(Integer.parseInt(dataRegionId)), + partitionId, + System.currentTimeMillis(), + lastFlushTimeMap.getMemSize(partitionId), + false); + } + return () -> { + for (TsFileResource tsFileResource : resourceList) { + try (SealedTsFileRecoverPerformer recoverPerformer = + new SealedTsFileRecoverPerformer(tsFileResource)) { + recoverPerformer.recover(); + tsFileResourceManager.registerSealedTsFileResource(tsFileResource); + } catch (Throwable e) { + logger.error( + "Fail to recover sealed TsFile {}, skip it.", tsFileResource.getTsFilePath(), e); + } finally { + // update recovery context + context.incrementRecoveredFilesNum(); + } + } + // After recover, replace partition last flush time with device last flush time + if (config.isEnableSeparateData()) { + upgradeAndUpdateDeviceLastFlushTime(partitionId, resourceList); + } + + return null; + }; + } + + private void syncRecoverFilesInPartition( long partitionId, DataRegionRecoveryContext context, List<TsFileResource> resourceList, @@ -829,8 +961,10 @@ public class DataRegion implements IDataRegionForQuery { for (TsFileResource tsFileResource : resourceList) { recoverSealedTsFiles(tsFileResource, context, isSeq); } + FileTimeIndexCacheRecorder.getInstance() + .logFileTimeIndex(resourceList.toArray(new TsFileResource[0])); if (config.isEnableSeparateData()) { - if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId)) { + if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId, true)) { TimePartitionManager.getInstance() .registerTimePartitionInfo( new TimePartitionInfo( @@ -841,7 +975,7 @@ public class DataRegion implements IDataRegionForQuery { lastFlushTimeMap.getMemSize(partitionId))); } for (TsFileResource tsFileResource : resourceList) { - updateLastFlushTime(tsFileResource, isSeq); + updateDeviceLastFlushTime(tsFileResource); } TimePartitionManager.getInstance() .updateAfterFlushing( @@ -890,19 +1024,7 @@ public class DataRegion implements IDataRegionForQuery { } // init map long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime()); - - if (config.isEnableSeparateData() - && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) { - TimePartitionManager.getInstance() - .registerTimePartitionInfo( - new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), - timePartitionId, - true, - Long.MAX_VALUE, - 0)); - } - + initFlushTimeMap(timePartitionId); boolean isSequence = config.isEnableSeparateData() && insertRowNode.getTime() @@ -988,18 +1110,7 @@ public class DataRegion implements IDataRegionForQuery { long beforeTimePartition = TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]); // init map - - if (config.isEnableSeparateData() - && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) { - TimePartitionManager.getInstance() - .registerTimePartitionInfo( - new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), - beforeTimePartition, - true, - Long.MAX_VALUE, - 0)); - } + initFlushTimeMap(beforeTimePartition); long lastFlushTime = config.isEnableSeparateData() @@ -1059,6 +1170,20 @@ public class DataRegion implements IDataRegionForQuery { return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() - time) <= dataTTL; } + private void initFlushTimeMap(long timePartitionId) { + if (config.isEnableSeparateData() + && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) { + TimePartitionManager.getInstance() + .registerTimePartitionInfo( + new TimePartitionInfo( + new DataRegionId(Integer.parseInt(dataRegionId)), + timePartitionId, + true, + Long.MAX_VALUE, + 0)); + } + } + /** * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be * inserted are in the range [start, end) Null value in each column values will be replaced by the @@ -1554,6 +1679,8 @@ public class DataRegion implements IDataRegionForQuery { "{} will close all files for deleting data folder {}", databaseName + "-" + dataRegionId, systemDir); + FileTimeIndexCacheRecorder.getInstance() + .removeFileTimeIndexCache(Integer.parseInt(dataRegionId)); writeLock("deleteFolder"); try { File dataRegionSystemFolder = @@ -2769,17 +2896,8 @@ public class DataRegion implements IDataRegionForQuery { if (config.isEnableSeparateData()) { final DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(this.dataRegionId)); final long timePartitionId = newTsFileResource.getTimePartition(); - if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) { - TimePartitionManager.getInstance() - .registerTimePartitionInfo( - new TimePartitionInfo( - dataRegionId, - timePartitionId, - false, - Long.MAX_VALUE, - lastFlushTimeMap.getMemSize(timePartitionId))); - } - updateLastFlushTime(newTsFileResource); + initFlushTimeMap(timePartitionId); + updateDeviceLastFlushTime(newTsFileResource); TimePartitionManager.getInstance() .updateAfterFlushing( dataRegionId, @@ -2905,23 +3023,6 @@ public class DataRegion implements IDataRegionForQuery { return version; } - /** - * Update latest time in latestTimeForEachDevice and - * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module. - */ - protected void updateLastFlushTime(TsFileResource newTsFileResource) { - for (IDeviceID device : newTsFileResource.getDevices()) { - long endTime = newTsFileResource.getEndTime(device); - long timePartitionId = TimePartitionUtils.getTimePartitionId(endTime); - if (config.isEnableSeparateData()) { - lastFlushTimeMap.updateOneDeviceFlushedTime(timePartitionId, device, endTime); - } - if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { - lastFlushTimeMap.updateOneDeviceGlobalFlushedTime(device, endTime); - } - } - } - /** * Execute the loading process by the type. * @@ -3292,17 +3393,7 @@ public class DataRegion implements IDataRegionForQuery { // init map long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime()); - if (config.isEnableSeparateData() - && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) { - TimePartitionManager.getInstance() - .registerTimePartitionInfo( - new TimePartitionInfo( - new DataRegionId(Integer.valueOf(dataRegionId)), - timePartitionId, - true, - Long.MAX_VALUE, - 0)); - } + initFlushTimeMap(timePartitionId); boolean isSequence = config.isEnableSeparateData() @@ -3407,17 +3498,7 @@ public class DataRegion implements IDataRegionForQuery { // init map timePartitionIds[i] = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime()); - if (config.isEnableSeparateData() - && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionIds[i])) { - TimePartitionManager.getInstance() - .registerTimePartitionInfo( - new TimePartitionInfo( - new DataRegionId(Integer.parseInt(dataRegionId)), - timePartitionIds[i], - true, - Long.MAX_VALUE, - 0)); - } + initFlushTimeMap(timePartitionIds[i]); areSequence[i] = config.isEnableSeparateData() && insertRowNode.getTime() @@ -3516,6 +3597,10 @@ public class DataRegion implements IDataRegionForQuery { } } + public File getDataRegionSysDir() { + return dataRegionSysDir; + } + public void addSettleFilesToList( List<TsFileResource> seqResourcesToBeSettled, List<TsFileResource> unseqResourcesToBeSettled, @@ -3694,6 +3779,12 @@ public class DataRegion implements IDataRegionForQuery { } } + public void compactFileTimeIndexCache() { + for (long timePartition : partitionMaxFileVersions.keySet()) { + tsFileManager.compactFileTimeIndexCache(timePartition); + } + } + @TestOnly public ILastFlushTimeMap getLastFlushTimeMap() { return lastFlushTimeMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java index fe62176469b..f02044b0414 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java @@ -49,4 +49,8 @@ public class DeviceLastFlushTime implements ILastFlushTime { } return new PartitionLastFlushTime(maxTime); } + + Map<IDeviceID, Long> getDeviceLastFlushTimeMap() { + return deviceLastFlushTimeMap; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java index 4060eb97bb2..3f0abfd3e24 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion; +import org.apache.iotdb.db.storageengine.StorageEngine; + import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,22 +61,7 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { /** record memory cost of map for each partitionId */ private final Map<Long, Long> memCostForEachPartition = new ConcurrentHashMap<>(); - // For load - @Override - public void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID deviceId, long time) { - ILastFlushTime flushTimeMapForPartition = - partitionLatestFlushedTime.computeIfAbsent( - timePartitionId, id -> new DeviceLastFlushTime()); - long lastFlushTime = flushTimeMapForPartition.getLastFlushTime(deviceId); - if (lastFlushTime == Long.MIN_VALUE) { - long memCost = HASHMAP_NODE_BASIC_SIZE + deviceId.ramBytesUsed(); - memCostForEachPartition.compute( - timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost); - } - flushTimeMapForPartition.updateLastFlushTime(deviceId, time); - } - - // For recover + // For sync recover resource without fileTimeIndexCache and load @Override public void updateMultiDeviceFlushedTime( long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) { @@ -82,7 +69,7 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { partitionLatestFlushedTime.computeIfAbsent( timePartitionId, id -> new DeviceLastFlushTime()); - long memIncr = 0; + long memIncr = 0L; for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) { if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) == Long.MIN_VALUE) { memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed(); @@ -94,10 +81,60 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + finalMemIncr); } + // For async recover resource with fileTimeIndexCache @Override - public void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time) { - globalLatestFlushedTimeForEachDevice.compute( - path, (k, v) -> v == null ? time : Math.max(v, time)); + public void upgradeAndUpdateMultiDeviceFlushedTime( + long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) { + ILastFlushTime flushTimeMapForPartition = + partitionLatestFlushedTime.computeIfAbsent( + timePartitionId, id -> new DeviceLastFlushTime()); + // upgrade DeviceLastFlushTime to PartitionLastFlushTime + if (flushTimeMapForPartition instanceof PartitionLastFlushTime) { + long maxFlushTime = flushTimeMapForPartition.getLastFlushTime(null); + ILastFlushTime newDeviceLastFlushTime = new DeviceLastFlushTime(); + long memIncr = 0; + for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) { + memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed(); + newDeviceLastFlushTime.updateLastFlushTime(entry.getKey(), entry.getValue()); + maxFlushTime = Math.max(maxFlushTime, entry.getValue()); + } + long finalMemIncr = memIncr; + memCostForEachPartition.compute( + timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + finalMemIncr); + } else { + // go here when DeviceLastFlushTime was recovered by wal recovery + long memIncr = 0; + for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) { + if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) == Long.MIN_VALUE) { + memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed(); + } + flushTimeMapForPartition.updateLastFlushTime(entry.getKey(), entry.getValue()); + } + long finalMemIncr = memIncr; + memCostForEachPartition.compute( + timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + finalMemIncr); + } + } + + // For fileTimeIndexCache recovered before the async resource recover start + @Override + public void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime) { + ILastFlushTime flushTimeMapForPartition = + partitionLatestFlushedTime.computeIfAbsent( + timePartitionId, id -> new PartitionLastFlushTime(maxFlushedTime)); + + if (flushTimeMapForPartition instanceof PartitionLastFlushTime) { + long memIncr = Long.BYTES; + flushTimeMapForPartition.updateLastFlushTime(null, maxFlushedTime); + memCostForEachPartition.putIfAbsent(timePartitionId, memIncr); + } else { + // go here when DeviceLastFlushTime was recovered by wal recovery + DeviceLastFlushTime deviceLastFlushTime = (DeviceLastFlushTime) flushTimeMapForPartition; + Map<IDeviceID, Long> flushedTimeMap = deviceLastFlushTime.getDeviceLastFlushTimeMap(); + for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) { + flushTimeMapForPartition.updateLastFlushTime(entry.getKey(), entry.getValue()); + } + } } @Override @@ -108,9 +145,14 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { } @Override - public boolean checkAndCreateFlushedTimePartition(long timePartitionId) { + public boolean checkAndCreateFlushedTimePartition( + long timePartitionId, boolean usingDeviceFlushTime) { if (!partitionLatestFlushedTime.containsKey(timePartitionId)) { - partitionLatestFlushedTime.put(timePartitionId, new DeviceLastFlushTime()); + partitionLatestFlushedTime.put( + timePartitionId, + usingDeviceFlushTime + ? new DeviceLastFlushTime() + : new PartitionLastFlushTime(Long.MIN_VALUE)); return false; } return true; @@ -135,8 +177,14 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { return partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId); } + // This method is for creating last cache entry when insert @Override public long getGlobalFlushedTime(IDeviceID path) { + // If TsFileResource is not fully recovered, we should return Long.MAX_VALUE + // to avoid create Last cache entry + if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { + return Long.MAX_VALUE; + } return globalLatestFlushedTimeForEachDevice.getOrDefault(path, Long.MIN_VALUE); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java index e809730cc0e..7bdd141bf6b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java @@ -27,23 +27,22 @@ import java.util.Map; public interface ILastFlushTimeMap { // region update - /** Update partitionLatestFlushedTime. */ - void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID deviceId, long time); - void updateMultiDeviceFlushedTime(long timePartitionId, Map<IDeviceID, Long> flushedTimeMap); - /** Update globalLatestFlushedTimeForEachDevice. */ - void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time); + void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime); void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> globalFlushedTimeMap); + void upgradeAndUpdateMultiDeviceFlushedTime( + long timePartitionId, Map<IDeviceID, Long> flushedTimeMap); + /** Update both partitionLatestFlushedTime and globalLatestFlushedTimeForEachDevice. */ void updateLatestFlushTime(long partitionId, Map<IDeviceID, Long> updateMap); // endregion // region ensure - boolean checkAndCreateFlushedTimePartition(long timePartitionId); + boolean checkAndCreateFlushedTimePartition(long timePartitionId, boolean usingDeviceFlushTime); // endregion diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java index 3469a0a6dec..17ad0dd4334 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java @@ -53,7 +53,7 @@ public class CompactionScheduleTaskWorker implements Callable<Void> { while (true) { try { Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getCompactionScheduleIntervalInMs()); - if (!StorageEngine.getInstance().isAllSgReady()) { + if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { continue; } List<DataRegion> dataRegionListSnapshot = new ArrayList<>(dataRegionList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java index f7341876ec5..d7757bb3ff9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java @@ -53,7 +53,7 @@ public class TTLScheduleTask implements Callable<Void> { while (true) { try { Thread.sleep(ttlCheckInterval); - if (!StorageEngine.getInstance().isAllSgReady()) { + if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { continue; } List<DataRegion> dataRegionListSnapshot = new ArrayList<>(dataRegionList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index f30dd1799d4..bc69d29d10d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -64,6 +64,7 @@ import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskAlign import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskChunkHandleImpl; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.UnclosedFileScanHandleImpl; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; @@ -1583,6 +1584,7 @@ public class TsFileProcessor { } writer.endFile(); tsFileResource.serialize(); + FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource); if (logger.isDebugEnabled()) { logger.debug("Ended file {}", tsFileResource); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java index e3bb6adc748..c9656382e3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile; +import java.util.Objects; + import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR; import static org.apache.tsfile.utils.FilePathUtils.splitTsFilePath; @@ -104,6 +106,27 @@ public class TsFileID { return versionArray; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TsFileID that = (TsFileID) o; + return regionId == that.regionId + && timePartitionId == that.timePartitionId + && timestamp == that.timestamp + && fileVersion == that.fileVersion + && compactionVersion == that.compactionVersion; + } + + @Override + public int hashCode() { + return Objects.hash(regionId, timePartitionId, timestamp, fileVersion, compactionVersion); + } + public long getTimestamp() { return timestamp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index dc242858bce..06409cf3cf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager; import org.apache.tsfile.read.filter.basic.Filter; @@ -39,7 +40,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class TsFileManager { private final String storageGroupName; private String dataRegionId; - private final String storageGroupDir; + private final String dataRegionSysDir; /** Serialize queries, delete resource files, compaction cleanup files */ private final ReadWriteLock resourceListLock = new ReentrantReadWriteLock(); @@ -52,9 +53,9 @@ public class TsFileManager { private volatile boolean allowCompaction = true; private final AtomicLong currentCompactionTaskSerialId = new AtomicLong(0); - public TsFileManager(String storageGroupName, String dataRegionId, String storageGroupDir) { + public TsFileManager(String storageGroupName, String dataRegionId, String dataRegionSysDir) { this.storageGroupName = storageGroupName; - this.storageGroupDir = storageGroupDir; + this.dataRegionSysDir = dataRegionSysDir; this.dataRegionId = dataRegionId; } @@ -260,6 +261,7 @@ public class TsFileManager { .computeIfAbsent(timePartition, t -> new TsFileResourceList()) .keepOrderInsert(resource); } + FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(resource); } } } finally { @@ -339,8 +341,8 @@ public class TsFileManager { return storageGroupName; } - public String getStorageGroupDir() { - return storageGroupDir; + public String getDataRegionSysDir() { + return dataRegionSysDir; } public Set<Long> getTimePartitions() { @@ -389,4 +391,19 @@ public class TsFileManager { return (sequenceFiles.higherKey(timePartitionId) == null && unsequenceFiles.higherKey(timePartitionId) == null); } + + public void compactFileTimeIndexCache(long timePartition) { + readLock(); + try { + FileTimeIndexCacheRecorder.getInstance() + .compactFileTimeIndexIfNeeded( + storageGroupName, + Integer.parseInt(dataRegionId), + timePartition, + sequenceFiles.get(timePartition), + unsequenceFiles.get(timePartition)); + } finally { + readUnlock(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 6159149d9f4..bc24dec1049 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -60,6 +60,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -292,6 +293,20 @@ public class TsFileResource { } } + public static int getFileTimeIndexSerializedSize() { + // 5 * 8 Byte means 5 long numbers of tsFileID.timestamp, tsFileID.fileVersion + // tsFileID.compactionVersion, timeIndex.getMinStartTime(), timeIndex.getMaxStartTime() + return 5 * Long.BYTES; + } + + public void serializeFileTimeIndexToByteBuffer(ByteBuffer buffer) { + buffer.putLong(tsFileID.timestamp); + buffer.putLong(tsFileID.fileVersion); + buffer.putLong(tsFileID.compactionVersion); + buffer.putLong(timeIndex.getMinStartTime()); + buffer.putLong(timeIndex.getMaxEndTime()); + } + public void updateStartTime(IDeviceID device, long time) { timeIndex.updateStartTime(device, time); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java new file mode 100644 index 00000000000..b0c805bba6b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList; +import org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent; +import static org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize; + +public class FileTimeIndexCacheRecorder { + + private static final Logger LOGGER = LoggerFactory.getLogger(FileTimeIndexCacheRecorder.class); + + private static final int VERSION = 0; + + protected static final String FILE_NAME = "FileTimeIndexCache_" + VERSION; + + private final ScheduledExecutorService recordFileIndexThread; + + private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(); + + private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap = + new ConcurrentHashMap<>(); + + private FileTimeIndexCacheRecorder() { + recordFileIndexThread = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.FILE_TIME_INDEX_RECORD.getName()); + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + recordFileIndexThread, this::executeTasks, 100, 100, TimeUnit.MILLISECONDS); + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + recordFileIndexThread, + StorageEngine.getInstance().executeCompactFileTimeIndexCache(), + 120_000L, + 120_000L, + TimeUnit.MILLISECONDS); + } + + private void executeTasks() { + Runnable task; + while ((task = taskQueue.poll()) != null) { + recordFileIndexThread.submit(task); + } + } + + public void logFileTimeIndex(TsFileResource... tsFileResources) { + if (tsFileResources != null && tsFileResources.length > 0) { + TsFileResource firstResource = tsFileResources[0]; + TsFileID tsFileID = firstResource.getTsFileID(); + int dataRegionId = tsFileID.regionId; + long partitionId = tsFileID.timePartitionId; + File dataRegionSysDir = + StorageEngine.getDataRegionSystemDir( + firstResource.getDatabaseName(), firstResource.getDataRegionId()); + FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId, dataRegionSysDir); + boolean result = + taskQueue.offer( + () -> { + try { + ByteBuffer buffer = + ByteBuffer.allocate( + getFileTimeIndexSerializedSize() * tsFileResources.length); + for (TsFileResource tsFileResource : tsFileResources) { + tsFileResource.serializeFileTimeIndexToByteBuffer(buffer); + } + buffer.flip(); + writer.write(buffer); + } catch (IOException e) { + LOGGER.warn("Meet error when record FileTimeIndexCache: {}", e.getMessage()); + } + }); + if (!result) { + LOGGER.warn("Meet error when record FileTimeIndexCache"); + } + } + } + + public void compactFileTimeIndexIfNeeded( + String dataBaseName, + int dataRegionId, + long partitionId, + TsFileResourceList sequenceFiles, + TsFileResourceList unsequenceFiles) { + FileTimeIndexCacheWriter writer = + getWriter( + dataRegionId, + partitionId, + StorageEngine.getDataRegionSystemDir(dataBaseName, String.valueOf(dataRegionId))); + + int currentResourceCount = + (sequenceFiles == null ? 0 : sequenceFiles.size()) + + (unsequenceFiles == null ? 0 : unsequenceFiles.size()); + if (writer.getLogFile().length() + > currentResourceCount * getFileTimeIndexSerializedSize() * 100L) { + + boolean result = + taskQueue.offer( + () -> { + try { + writer.clearFile(); + if (sequenceFiles != null && !sequenceFiles.isEmpty()) { + ByteBuffer buffer = + ByteBuffer.allocate( + getFileTimeIndexSerializedSize() * sequenceFiles.size()); + for (TsFileResource tsFileResource : sequenceFiles) { + tsFileResource.serializeFileTimeIndexToByteBuffer(buffer); + } + buffer.flip(); + writer.write(buffer); + } + if (unsequenceFiles != null && !unsequenceFiles.isEmpty()) { + ByteBuffer buffer = + ByteBuffer.allocate( + getFileTimeIndexSerializedSize() * unsequenceFiles.size()); + for (TsFileResource tsFileResource : unsequenceFiles) { + tsFileResource.serializeFileTimeIndexToByteBuffer(buffer); + } + buffer.flip(); + writer.write(buffer); + } + } catch (IOException e) { + LOGGER.warn("Meet error when compact FileTimeIndexCache: {}", e.getMessage()); + } + }); + if (!result) { + LOGGER.warn("Meet error when compact FileTimeIndexCache"); + } + } + } + + private FileTimeIndexCacheWriter getWriter( + int dataRegionId, long partitionId, File dataRegionSysDir) { + return writerMap + .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) + .computeIfAbsent( + partitionId, + k -> { + File partitionDir = + SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); + File logFile = SystemFileFactory.INSTANCE.getFile(partitionDir, FILE_NAME); + try { + if (!partitionDir.exists() && !partitionDir.mkdirs()) { + LOGGER.debug( + "Partition directory has existed,filePath:{}", + partitionDir.getAbsolutePath()); + } + if (!logFile.createNewFile()) { + LOGGER.debug( + "Partition log file has existed,filePath:{}", logFile.getAbsolutePath()); + } + return new FileTimeIndexCacheWriter(logFile, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + public void close() throws IOException { + for (Map<Long, FileTimeIndexCacheWriter> partitionWriterMap : writerMap.values()) { + for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) { + writer.close(); + } + } + } + + public void removeFileTimeIndexCache(int dataRegionId) { + Map<Long, FileTimeIndexCacheWriter> partitionWriterMap = writerMap.get(dataRegionId); + if (partitionWriterMap != null) { + for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) { + try { + writer.close(); + deleteDirectoryAndEmptyParent(writer.getLogFile()); + } catch (IOException e) { + LOGGER.warn("Meet error when close FileTimeIndexCache: {}", e.getMessage()); + } + } + } + } + + public static FileTimeIndexCacheRecorder getInstance() { + return FileTimeIndexCacheRecorder.InstanceHolder.INSTANCE; + } + + private static class InstanceHolder { + private InstanceHolder() {} + + private static final FileTimeIndexCacheRecorder INSTANCE = new FileTimeIndexCacheRecorder(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java new file mode 100644 index 00000000000..35682bd8020 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize; + +public class FileTimeIndexCacheReader { + + private static final Logger LOGGER = LoggerFactory.getLogger(FileTimeIndexCacheReader.class); + + private final File logFile; + private final long fileLength; + private final int dataRegionId; + private final long partitionId; + + public FileTimeIndexCacheReader(File logFile, String dataRegionId, long partitionId) { + this.logFile = logFile; + this.fileLength = logFile.length(); + this.dataRegionId = Integer.parseInt(dataRegionId); + this.partitionId = partitionId; + } + + public Map<TsFileID, FileTimeIndex> read() throws IOException { + Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>(); + long readLength = 0L; + try (DataInputStream logStream = + new DataInputStream(new BufferedInputStream(Files.newInputStream(logFile.toPath())))) { + while (readLength < fileLength) { + long timestamp = logStream.readLong(); + long fileVersion = logStream.readLong(); + long compactionVersion = logStream.readLong(); + long minStartTime = logStream.readLong(); + long maxEndTime = logStream.readLong(); + TsFileID tsFileID = + new TsFileID(dataRegionId, partitionId, timestamp, fileVersion, compactionVersion); + FileTimeIndex fileTimeIndex = new FileTimeIndex(minStartTime, maxEndTime); + fileTimeIndexMap.put(tsFileID, fileTimeIndex); + readLength += getFileTimeIndexSerializedSize(); + } + } catch (IOException ignored) { + // the error can be ignored + } + if (readLength != fileLength) { + // if the file is complete, we can truncate the file + try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.WRITE)) { + channel.truncate(readLength); + } + } + return fileTimeIndexMap; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java new file mode 100644 index 00000000000..dce4f8b1103 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache; + +import org.apache.iotdb.db.utils.writelog.ILogWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; + +public class FileTimeIndexCacheWriter implements ILogWriter { + private static final Logger logger = LoggerFactory.getLogger(FileTimeIndexCacheWriter.class); + + private final File logFile; + private FileOutputStream fileOutputStream; + private FileChannel channel; + private final boolean forceEachWrite; + + public FileTimeIndexCacheWriter(File logFile, boolean forceEachWrite) + throws FileNotFoundException { + this.logFile = logFile; + this.forceEachWrite = forceEachWrite; + + fileOutputStream = new FileOutputStream(logFile, true); + channel = fileOutputStream.getChannel(); + } + + @Override + public void write(ByteBuffer logBuffer) throws IOException { + + try { + channel.write(logBuffer); + if (this.forceEachWrite) { + channel.force(true); + } + } catch (ClosedChannelException ignored) { + logger.warn("someone interrupt current thread, so no need to do write for io safety"); + } + } + + @Override + public void force() throws IOException { + if (channel != null && channel.isOpen()) { + channel.force(true); + } + } + + @Override + public void close() throws IOException { + if (channel != null) { + if (channel.isOpen()) { + channel.force(false); + } + fileOutputStream.close(); + fileOutputStream = null; + channel.close(); + channel = null; + } + } + + public void clearFile() throws IOException { + close(); + Files.delete(this.logFile.toPath()); + if (!logFile.createNewFile()) { + logger.warn("Partition log file has existed,filePath:{}", logFile.getAbsolutePath()); + } + fileOutputStream = new FileOutputStream(logFile, true); + channel = fileOutputStream.getChannel(); + } + + @Override + public String toString() { + return "LogWriter{" + "logFile=" + logFile + '}'; + } + + public File getLogFile() { + return logFile; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java index 72b4237c868..c7af6e2e518 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALRecoverException; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener; @@ -285,6 +286,7 @@ public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerform // currently, we close this file anyway writer.endFile(); tsFileResource.serialize(); + FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource); } catch (IOException | ExecutionException e) { throw new WALRecoverException(e); } catch (InterruptedException e) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java index 880a5732bc7..20b3dfe378a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java @@ -205,7 +205,7 @@ public class LastFlushTimeMapTest { unseqResource1.setFile(unseqResourceFile1); unseqResource1.updateStartTime(device, 1); unseqResource1.updateEndTime(device, 100); - dataRegion.updateLastFlushTime(unseqResource1); + dataRegion.updateDeviceLastFlushTime(unseqResource1); File unseqResourceFile2 = new File(unseqDirPath + File.separator + "5-5-0-0.tsfile.resource"); TsFileResource unseqResource2 = new TsFileResource(); @@ -213,7 +213,7 @@ public class LastFlushTimeMapTest { unseqResource2.setFile(unseqResourceFile2); unseqResource2.updateStartTime(device, 1); unseqResource2.updateEndTime(device, 10); - dataRegion.updateLastFlushTime(unseqResource2); + dataRegion.updateDeviceLastFlushTime(unseqResource2); File unseqResourceFile3 = new File(unseqDirPath + File.separator + "6-6-0-0.tsfile.resource"); TsFileResource unseqResource3 = new TsFileResource(); @@ -221,7 +221,7 @@ public class LastFlushTimeMapTest { unseqResource3.setFile(unseqResourceFile3); unseqResource3.updateStartTime(device, 1); unseqResource3.updateEndTime(device, 70); - dataRegion.updateLastFlushTime(unseqResource3); + dataRegion.updateDeviceLastFlushTime(unseqResource3); Assert.assertEquals(100, dataRegion.getLastFlushTimeMap().getFlushedTime(0, device)); dataRegion.getLastFlushTimeMap().degradeLastFlushTime(0); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index 04443ba99fe..1f160e07758 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager; import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; @@ -224,6 +225,7 @@ public class EnvironmentUtils { for (String path : tierManager.getAllLocalUnSequenceFileFolders()) { cleanDir(path); } + FileTimeIndexCacheRecorder.getInstance().close(); // delete system info cleanDir(config.getSystemDir()); // delete query diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index e731f50628c..102022417e9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -183,6 +183,8 @@ public enum ThreadName { REGION_MIGRATE("Region-Migrate-Pool"), STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"), REPAIR_DATA("RepairData"), + FILE_TIME_INDEX_RECORD("FileTimeIndexRecord"), + // the unknown thread name is used for metrics UNKOWN("UNKNOWN");
