This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch speed_up_recover in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3e6580abccf4885e741c3740b011fbf4a1745b49 Author: HTHou <[email protected]> AuthorDate: Wed Jul 24 10:28:25 2024 +0800 dev --- .../db/storageengine/dataregion/DataRegion.java | 68 +++++++------ .../dataregion/memtable/TsFileProcessor.java | 3 + .../dataregion/tsfile/TsFileResource.java | 18 +++- .../tsfile/timeindex/FileTimeIndexCache.java | 50 ---------- .../tsfile/timeindex/PartitionLogRecorder.java | 109 +++++++++++++++++++++ 5 files changed, 167 insertions(+), 81 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 1396ab501f4..c1f9575f680 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion; +import java.io.FileNotFoundException; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -112,6 +113,7 @@ import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; import org.apache.iotdb.db.utils.DateTimeUtils; +import org.apache.iotdb.db.utils.writelog.PartitionLogReader; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -243,8 +245,8 @@ public class DataRegion implements IDataRegionForQuery { /** database name. */ private final String databaseName; - /** database system directory. */ - private File storageGroupSysDir; + /** data region system directory. */ + private File dataRegionSysDir; /** manage seqFileList and unSeqFileList. */ private final TsFileManager tsFileManager; @@ -320,14 +322,13 @@ public class DataRegion implements IDataRegionForQuery { this.fileFlushPolicy = fileFlushPolicy; acquireDirectBufferMemory(); - storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); - this.tsFileManager = - new TsFileManager(databaseName, dataRegionId, storageGroupSysDir.getPath()); - if (storageGroupSysDir.mkdirs()) { + dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); + this.tsFileManager = new TsFileManager(databaseName, dataRegionId, dataRegionSysDir.getPath()); + if (dataRegionSysDir.mkdirs()) { logger.info( - "Database system Directory {} doesn't exist, create it", storageGroupSysDir.getPath()); - } else if (!storageGroupSysDir.exists()) { - logger.error("create database system Directory {} failed", storageGroupSysDir.getPath()); + "Database system Directory {} doesn't exist, create it", dataRegionSysDir.getPath()); + } else if (!dataRegionSysDir.exists()) { + logger.error("create database system Directory {} failed", dataRegionSysDir.getPath()); } lastFlushTimeMap = new HashLastFlushTimeMap(); @@ -384,15 +385,6 @@ public class DataRegion implements IDataRegionForQuery { isReady = ready; } - private Map<Long, List<TsFileResource>> splitResourcesByPartition( - List<TsFileResource> resources) { - Map<Long, List<TsFileResource>> ret = new TreeMap<>(); - for (TsFileResource resource : resources) { - ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource); - } - return ret; - } - /** this class is used to store recovering context. */ private class DataRegionRecoveryContext { /** number of files to be recovered. */ @@ -451,19 +443,16 @@ public class DataRegion implements IDataRegionForQuery { try { // collect candidate TsFiles from sequential and unsequential data directory - List<TsFileResource> tmpSeqTsFiles = - getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders()); - List<TsFileResource> tmpUnseqTsFiles = - getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders()); - // split by partition so that we can find the last file of each partition and decide to // close it or not - DataRegionRecoveryContext dataRegionRecoveryContext = - new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() + tmpUnseqTsFiles.size()); Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = - splitResourcesByPartition(tmpSeqTsFiles); + getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders()); Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = - splitResourcesByPartition(tmpUnseqTsFiles); + getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders()); + DataRegionRecoveryContext dataRegionRecoveryContext = + new DataRegionRecoveryContext( + partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum() + + partitionTmpUnseqTsFiles.values().stream().mapToLong(List::size).sum()); // submit unsealed TsFiles to recover List<WALRecoverListener> recoverListeners = new ArrayList<>(); for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) { @@ -646,7 +635,7 @@ public class DataRegion implements IDataRegionForQuery { } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private List<TsFileResource> getAllFiles(List<String> folders) + private Map<Long, List<TsFileResource>> getAllFiles(List<String> folders) throws IOException, DataRegionException { // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition Map<String, File> tsFilePartitionPath2File = new HashMap<>(); @@ -684,10 +673,12 @@ public class DataRegion implements IDataRegionForQuery { sortedFiles.sort(this::compareFileName); long currentTime = System.currentTimeMillis(); - List<TsFileResource> ret = new ArrayList<>(); + Map<Long, List<TsFileResource>> ret = new TreeMap<>(); for (File f : sortedFiles) { checkTsFileTime(f, currentTime); - ret.add(new TsFileResource(f)); + TsFileResource resource = new TsFileResource(f); + ret.computeIfAbsent(resource.getTsFileID().timePartitionId, l -> new ArrayList<>()) + .add(resource); } return ret; } @@ -826,6 +817,19 @@ public class DataRegion implements IDataRegionForQuery { DataRegionRecoveryContext context, List<TsFileResource> resourceList, boolean isSeq) { + + // TODO: read partition file + File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); + if (logFile.exists()) { + try { + PartitionLogReader logReader = new PartitionLogReader(logFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + for (TsFileResource tsFileResource : resourceList) { recoverSealedTsFiles(tsFileResource, context, isSeq); } @@ -3506,6 +3510,10 @@ public class DataRegion implements IDataRegionForQuery { } } + public File getDataRegionSysDir() { + return dataRegionSysDir; + } + public void addSettleFilesToList( List<TsFileResource> seqResourcesToBeSettled, List<TsFileResource> unseqResourcesToBeSettled, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index ae974b17519..e178290f45c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -64,6 +64,7 @@ import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskAlign import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskChunkHandleImpl; import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.UnclosedFileScanHandleImpl; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.PartitionLogRecorder; import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; @@ -1581,6 +1582,8 @@ public class TsFileProcessor { } writer.endFile(); tsFileResource.serialize(); + PartitionLogRecorder.getInstance() + .submitTask(dataRegionInfo.getDataRegion().getDataRegionSysDir(), tsFileResource); if (logger.isDebugEnabled()) { logger.debug("Ended file {}", tsFileResource); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 384ee410c54..0727d7e770f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TestOnly; @@ -49,6 +50,7 @@ import org.apache.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.FilePathUtils; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.writer.TsFileIOWriter; @@ -56,10 +58,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -232,7 +236,6 @@ public class TsFileResource { File dest = fsFactory.getFile(file + RESOURCE_SUFFIX); fsFactory.deleteIfExists(dest); fsFactory.moveFile(src, dest); - } private void serializeTo(BufferedOutputStream outputStream) throws IOException { @@ -293,6 +296,19 @@ public class TsFileResource { } } + public ByteBuffer serializeFileTimeIndexToByteBuffer() { + try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + outputStream.writeLong(tsFileID.fileVersion); + outputStream.writeLong(tsFileID.compactionVersion); + outputStream.writeLong(timeIndex.getMinStartTime()); + outputStream.writeLong(timeIndex.getMaxEndTime()); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } catch (IOException e) { + throw new SerializationRunTimeException(e); + } + } + public void updateStartTime(IDeviceID device, long time) { timeIndex.updateStartTime(device, time); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCache.java deleted file mode 100644 index ca9118d7d78..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCache.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex; - -import java.util.concurrent.TimeUnit; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ScheduledExecutorService; - -public class FileTimeIndexCache { - - private ScheduledExecutorService recordFileIndexThread; - - private final BlockingQueue<TsFileResource> resourceQueue = new ArrayBlockingQueue<>(100); - - public void add(TsFileResource resource) { - resourceQueue.add(resource); - } - -// public void recordTsFileResource() { -// recordFileIndexThread = -// IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( -// ThreadName.FILE_TIMEINDEX_RECORD.getName()); -// ScheduledExecutorUtil.safelyScheduleWithFixedDelay( -// recordFileIndexThread, this::deleteOutdatedFiles, initDelayMs, periodMs, -// TimeUnit.MILLISECONDS); -// } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java new file mode 100644 index 00000000000..9ba45ca2f15 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PartitionLogRecorder.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.writelog.LogWriter; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.iotdb.db.utils.writelog.PartitionLogWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PartitionLogRecorder { + + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionLogRecorder.class); + + private final ScheduledExecutorService recordFileIndexThread; + + private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(); + + private final Map<Integer, Map<Long, PartitionLogWriter>> writerMap = new HashMap<>(); + + private PartitionLogRecorder() { + recordFileIndexThread = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.FILE_TIMEINDEX_RECORD.getName()); + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + recordFileIndexThread, this::executeTasks, 0, 1, TimeUnit.SECONDS); + } + + private void executeTasks() { + Runnable task; + while ((task = taskQueue.poll()) != null) { + recordFileIndexThread.submit(task); + } + } + + public void submitTask(File dataRegionSysDir, TsFileResource tsFileResource) { + TsFileID tsFileID = tsFileResource.getTsFileID(); + int dataRegionId = tsFileID.regionId; + long partitionId = tsFileID.timePartitionId; + + PartitionLogWriter writer = + writerMap + .computeIfAbsent(dataRegionId, k -> new HashMap<>()) + .computeIfAbsent( + partitionId, + k -> { + try { + File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, String.valueOf(partitionId)); + if (!logFile.createNewFile()) { + LOGGER.warn( + "Partition log file has existed,filePath:{}", + logFile.getAbsolutePath()); + } + return new PartitionLogWriter(logFile, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + taskQueue.offer( + () -> { + try { + writer.write(tsFileResource.serializeFileTimeIndexToByteBuffer()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + public static PartitionLogRecorder getInstance() { + return PartitionLogRecorder.InstanceHolder.INSTANCE; + } + + private static class InstanceHolder { + private InstanceHolder() {} + + private static final PartitionLogRecorder INSTANCE = new PartitionLogRecorder(); + } +}
