This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1fed2c65a15 Speed up recover (#13068)
1fed2c65a15 is described below
commit 1fed2c65a1539f24b9d15fc35085c277a16372da
Author: Haonan <[email protected]>
AuthorDate: Thu Aug 29 10:31:25 2024 +0800
Speed up recover (#13068)
* init
* init
* init
* dev
* dev
* dev
* dev reader
* support read and write FileTimeIndexCache
* support read and write FileTimeIndexCache
* dev recover progress
* update last flush time after async recover finished
* fix package structure
* finish compact logic
* fix UT
* Fix repair data error
* adapt pipe
* try to fix 1c3d IT
* fixing recover from wal not recode fileTimeIndex
* recover from wal need to recode file timeindex cache
* fix sonar bug
* update more
* fix empty FileTimeIndexCache
* batch serialize
* control the thread number of recover
* fix cannot start recover task
* fix delete region
* fix compile issue
* fixing review
* fix review
* fix review and small issue
* fix review and small issue
---
.../iotdb/db/it/IoTDBPartialInsertionIT.java | 2 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 3 +
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 2 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 40 +++-
.../iotdb/db/storageengine/StorageEngine.java | 100 ++++++--
.../db/storageengine/dataregion/DataRegion.java | 264 +++++++++++++++------
.../dataregion/DeviceLastFlushTime.java | 4 +
.../dataregion/HashLastFlushTimeMap.java | 92 +++++--
.../dataregion/ILastFlushTimeMap.java | 11 +-
.../schedule/CompactionScheduleTaskWorker.java | 2 +-
.../compaction/schedule/TTLScheduleTask.java | 2 +-
.../dataregion/memtable/TsFileProcessor.java | 2 +
.../storageengine/dataregion/tsfile/TsFileID.java | 23 ++
.../dataregion/tsfile/TsFileManager.java | 27 ++-
.../dataregion/tsfile/TsFileResource.java | 15 ++
.../timeindex/FileTimeIndexCacheRecorder.java | 227 ++++++++++++++++++
.../tsfile/timeindex/PlainDeviceTimeIndex.java | 23 +-
.../FileTimeIndexCacheReader.java | 84 +++++++
.../FileTimeIndexCacheWriter.java | 103 ++++++++
.../file/UnsealedTsFileRecoverPerformer.java | 2 +
.../dataregion/LastFlushTimeMapTest.java | 6 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 2 +
.../iotdb/commons/concurrent/ThreadName.java | 2 +
24 files changed, 896 insertions(+), 144 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java
index 0ac7867d893..2e62d837b07 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java
@@ -109,7 +109,7 @@ public class IoTDBPartialInsertionIT {
EnvironmentUtils.restartDaemon();
StorageEngine.getInstance().recover();
// wait for recover
- while (!StorageEngine.getInstance().isAllSgReady()) {
+ while (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
Thread.sleep(500);
time += 500;
if (time > 10000) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 8120c8364e2..5d3be05fbad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -368,6 +368,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
@Override
public synchronized void start() {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+ return;
+ }
if (!shouldExtractInsertion) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2764bb51586..fdf25118261 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1829,7 +1829,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus startRepairData() throws TException {
- if (!storageEngine.isAllSgReady()) {
+ if (!storageEngine.isReadyForNonReadWriteFunctions()) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all
sg is ready");
}
IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 9262a24ee66..feffd0905ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1145,7 +1145,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(e);
}
} else {
- if (!StorageEngine.getInstance().isAllSgReady()) {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
future.setException(
new IoTDBException(
"not all sg is ready",
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index b715a7c6ca7..13d408ceb99 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -247,6 +247,25 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
logger.info("IoTDB configuration: {}", config.getConfigMessage());
logger.info("Congratulations, IoTDB DataNode is set up successfully.
Now, enjoy yourself!");
+ if (isUsingPipeConsensus()) {
+ long dataRegionStartTime = System.currentTimeMillis();
+ while (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions())
{
+ try {
+ TimeUnit.MILLISECONDS.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.warn("IoTDB DataNode failed to set up.", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ DataRegionConsensusImpl.getInstance().start();
+ long dataRegionEndTime = System.currentTimeMillis();
+ logger.info(
+ "DataRegion consensus start successfully, which takes {} ms.",
+ (dataRegionEndTime - dataRegionStartTime));
+ dataRegionConsensusStarted = true;
+ }
+
} catch (StartupException | IOException e) {
logger.error("Fail to start server", e);
stop();
@@ -668,12 +687,14 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
"SchemaRegion consensus start successfully, which takes {} ms.",
(schemaRegionEndTime - startTime));
schemaRegionConsensusStarted = true;
- DataRegionConsensusImpl.getInstance().start();
- long dataRegionEndTime = System.currentTimeMillis();
- logger.info(
- "DataRegion consensus start successfully, which takes {} ms.",
- (dataRegionEndTime - schemaRegionEndTime));
- dataRegionConsensusStarted = true;
+ if (!isUsingPipeConsensus()) {
+ DataRegionConsensusImpl.getInstance().start();
+ long dataRegionEndTime = System.currentTimeMillis();
+ logger.info(
+ "DataRegion consensus start successfully, which takes {} ms.",
+ (dataRegionEndTime - schemaRegionEndTime));
+ dataRegionConsensusStarted = true;
+ }
} catch (IOException e) {
throw new StartupException(e);
}
@@ -722,7 +743,7 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
logger.info(
"IoTDB DataNode is setting up, some databases may not be ready now,
please wait several seconds...");
long startTime = System.currentTimeMillis();
- while (!StorageEngine.getInstance().isAllSgReady()) {
+ while (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
@@ -818,6 +839,11 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
return new TDataNodeConfiguration(location, resource);
}
+ private boolean isUsingPipeConsensus() {
+ return
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)
+ ||
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS);
+ }
+
private void registerUdfServices() throws StartupException {
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 5b4a0d3bd19..208fac15061 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -120,7 +120,7 @@ public class StorageEngine implements IService {
* a folder (system/databases/ by default) that persist system info. Each
database will have a
* subfolder under the systemDir.
*/
- private final String systemDir =
+ private static final String systemDir =
FilePathUtils.regularizePath(CONFIG.getSystemDir()) + "databases";
/** DataRegionId -> DataRegion */
@@ -134,19 +134,21 @@ public class StorageEngine implements IService {
/** number of ready data region */
private AtomicInteger readyDataRegionNum;
- private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
+ private final AtomicBoolean isReadyForReadAndWrite = new AtomicBoolean();
+
+ private final AtomicBoolean isReadyForNonReadWriteFunctions = new
AtomicBoolean();
private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
- private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
+ private final TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
/** used to do short-lived asynchronous tasks */
private ExecutorService cachedThreadPool;
// add customized listeners here for flush and close events
- private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
- private List<FlushListener> customFlushListeners = new ArrayList<>();
+ private final List<CloseFileListener> customCloseFileListeners = new
ArrayList<>();
+ private final List<FlushListener> customFlushListeners = new ArrayList<>();
private int recoverDataRegionNum = 0;
private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
@@ -178,17 +180,19 @@ public class StorageEngine implements IService {
}
}
- public boolean isAllSgReady() {
- return isAllSgReady.get();
+ public boolean isReadyForReadAndWrite() {
+ return isReadyForReadAndWrite.get();
}
- public void setAllSgReady(boolean allSgReady) {
- isAllSgReady.set(allSgReady);
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ public boolean isReadyForNonReadWriteFunctions() {
+ return isReadyForNonReadWriteFunctions.get();
}
- public void asyncRecover() throws StartupException {
+ private void asyncRecoverDataRegion() throws StartupException {
long startRecoverTime = System.currentTimeMillis();
- setAllSgReady(false);
+ isReadyForNonReadWriteFunctions.set(false);
+ isReadyForReadAndWrite.set(false);
cachedThreadPool =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
@@ -209,8 +213,7 @@ public class StorageEngine implements IService {
new Thread(
() -> {
checkResults(futures, "StorageEngine failed to recover.");
- recoverRepairData();
- setAllSgReady(true);
+ isReadyForReadAndWrite.set(true);
LOGGER.info(
"Storage Engine recover cost: {}s.",
(System.currentTimeMillis() - startRecoverTime) / 1000);
@@ -287,11 +290,22 @@ public class StorageEngine implements IService {
throw new StorageEngineFailureException(e);
}
- asyncRecover();
-
- LOGGER.info("start ttl check thread successfully.");
+ asyncRecoverDataRegion();
startTimedService();
+
+ // wait here for dataRegionMap recovered
+ while (!isReadyForReadAndWrite.get()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Storage engine failed to set up.", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+
+ asyncRecoverTsFileResource();
}
private void startTimedService() {
@@ -339,6 +353,36 @@ public class StorageEngine implements IService {
}
}
+ private void asyncRecoverTsFileResource() {
+ List<Future<Void>> futures = new LinkedList<>();
+ for (DataRegion dataRegion : dataRegionMap.values()) {
+ if (dataRegion != null) {
+ List<Callable<Void>> asyncTsFileResourceRecoverTasks =
+ dataRegion.getAsyncTsFileResourceRecoverTaskList();
+ if (asyncTsFileResourceRecoverTasks != null) {
+ Callable<Void> taskOfRegion =
+ () -> {
+ for (Callable<Void> task : asyncTsFileResourceRecoverTasks) {
+ task.call();
+ }
+ dataRegion.initCompactionSchedule();
+ return null;
+ };
+ futures.add(cachedThreadPool.submit(taskOfRegion));
+ }
+ }
+ }
+ Thread recoverEndTrigger =
+ new Thread(
+ () -> {
+ checkResults(futures, "async recover tsfile resource meets
error.");
+ recoverRepairData();
+ isReadyForNonReadWriteFunctions.set(true);
+ },
+ ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
+ recoverEndTrigger.start();
+ }
+
@Override
public void stop() {
for (DataRegion dataRegion : dataRegionMap.values()) {
@@ -649,8 +693,6 @@ public class StorageEngine implements IService {
/**
* Add a listener to listen flush start/end events. Notice that this
addition only applies to
* TsFileProcessors created afterwards.
- *
- * @param listener
*/
public void registerFlushListener(FlushListener listener) {
customFlushListeners.add(listener);
@@ -659,8 +701,6 @@ public class StorageEngine implements IService {
/**
* Add a listener to listen file close events. Notice that this addition
only applies to
* TsFileProcessors created afterwards.
- *
- * @param listener
*/
public void registerCloseFileListener(CloseFileListener listener) {
customCloseFileListeners.add(listener);
@@ -676,7 +716,7 @@ public class StorageEngine implements IService {
}
// When registering a new region, the coordinator needs to register the
corresponding region with
- // the local storageengine before adding the corresponding consensusGroup to
the consensus layer
+ // the local storage before adding the corresponding consensusGroup to the
consensus layer
public DataRegion createDataRegion(DataRegionId regionId, String sg) throws
DataRegionException {
makeSureNoOldRegion(regionId);
AtomicReference<DataRegionException> exceptionAtomicReference = new
AtomicReference<>(null);
@@ -953,6 +993,24 @@ public class StorageEngine implements IService {
});
}
+ public static File getDataRegionSystemDir(String dataBaseName, String
dataRegionId) {
+ return SystemFileFactory.INSTANCE.getFile(
+ systemDir + File.separator + dataBaseName, dataRegionId);
+ }
+
+ public Runnable executeCompactFileTimeIndexCache() {
+ return () -> {
+ if (!isReadyForNonReadWriteFunctions()) {
+ return;
+ }
+ for (DataRegion dataRegion : dataRegionMap.values()) {
+ if (dataRegion != null) {
+ dataRegion.compactFileTimeIndexCache();
+ }
+ }
+ };
+ }
+
static class InstanceHolder {
private static final StorageEngine INSTANCE = new StorageEngine();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 99c6bcf5dc0..3953fd8d1a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -95,11 +95,15 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle;
import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.ClosedFileScanHandleImpl;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.VersionController;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheReader;
import
org.apache.iotdb.db.storageengine.dataregion.utils.validate.TsFileValidator;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
@@ -154,6 +158,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -250,8 +255,8 @@ public class DataRegion implements IDataRegionForQuery {
/** database name. */
private final String databaseName;
- /** database system directory. */
- private File storageGroupSysDir;
+ /** data region system directory. */
+ private File dataRegionSysDir;
/** manage seqFileList and unSeqFileList. */
private final TsFileManager tsFileManager;
@@ -287,6 +292,8 @@ public class DataRegion implements IDataRegionForQuery {
/** whether it's ready from recovery. */
private boolean isReady = false;
+ private List<Callable<Void>> asyncTsFileResourceRecoverTaskList;
+
/** close file listeners. */
private List<CloseFileListener> customCloseFileListeners =
Collections.emptyList();
@@ -327,21 +334,20 @@ public class DataRegion implements IDataRegionForQuery {
this.fileFlushPolicy = fileFlushPolicy;
acquireDirectBufferMemory();
- storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir,
dataRegionId);
- this.tsFileManager =
- new TsFileManager(databaseName, dataRegionId,
storageGroupSysDir.getPath());
- if (storageGroupSysDir.mkdirs()) {
+ dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir,
dataRegionId);
+ this.tsFileManager = new TsFileManager(databaseName, dataRegionId,
dataRegionSysDir.getPath());
+ if (dataRegionSysDir.mkdirs()) {
logger.info(
- "Database system Directory {} doesn't exist, create it",
storageGroupSysDir.getPath());
- } else if (!storageGroupSysDir.exists()) {
- logger.error("create database system Directory {} failed",
storageGroupSysDir.getPath());
+ "Database system Directory {} doesn't exist, create it",
dataRegionSysDir.getPath());
+ } else if (!dataRegionSysDir.exists()) {
+ logger.error("create database system Directory {} failed",
dataRegionSysDir.getPath());
}
lastFlushTimeMap = new HashLastFlushTimeMap();
- // recover tsfiles unless consensus protocol is ratis and storage
storageengine is not ready
+ // recover tsfiles unless consensus protocol is ratis and storage engine
is not ready
if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
- && !StorageEngine.getInstance().isAllSgReady()) {
+ && !StorageEngine.getInstance().isReadyForReadAndWrite()) {
logger.debug(
"Skip recovering data region {}[{}] when consensus protocol is ratis
and storage engine is not ready.",
databaseName,
@@ -363,6 +369,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
} else {
+ asyncTsFileResourceRecoverTaskList = new ArrayList<>();
recover();
}
@@ -387,17 +394,8 @@ public class DataRegion implements IDataRegionForQuery {
return isReady;
}
- public void setReady(boolean ready) {
- isReady = ready;
- }
-
- private Map<Long, List<TsFileResource>> splitResourcesByPartition(
- List<TsFileResource> resources) {
- Map<Long, List<TsFileResource>> ret = new TreeMap<>();
- for (TsFileResource resource : resources) {
- ret.computeIfAbsent(resource.getTimePartition(), l -> new
ArrayList<>()).add(resource);
- }
- return ret;
+ public List<Callable<Void>> getAsyncTsFileResourceRecoverTaskList() {
+ return asyncTsFileResourceRecoverTaskList;
}
/** this class is used to store recovering context. */
@@ -459,19 +457,16 @@ public class DataRegion implements IDataRegionForQuery {
try {
// collect candidate TsFiles from sequential and unsequential data
directory
- List<TsFileResource> tmpSeqTsFiles =
-
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
- List<TsFileResource> tmpUnseqTsFiles =
-
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
-
// split by partition so that we can find the last file of each
partition and decide to
// close it or not
- DataRegionRecoveryContext dataRegionRecoveryContext =
- new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() +
tmpUnseqTsFiles.size());
Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
- splitResourcesByPartition(tmpSeqTsFiles);
+
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
- splitResourcesByPartition(tmpUnseqTsFiles);
+
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
+ DataRegionRecoveryContext dataRegionRecoveryContext =
+ new DataRegionRecoveryContext(
+
partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum()
+ +
partitionTmpUnseqTsFiles.values().stream().mapToLong(List::size).sum());
// submit unsealed TsFiles to recover
List<WALRecoverListener> recoverListeners = new ArrayList<>();
for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
@@ -552,13 +547,27 @@ public class DataRegion implements IDataRegionForQuery {
((TreeMap<Long, List<TsFileResource>>)
partitionTmpUnseqTsFiles).lastKey());
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpSeqTsFiles.entrySet()) {
- recoverFilesInPartition(
- partitionFiles.getKey(), dataRegionRecoveryContext,
partitionFiles.getValue(), true);
+ Callable<Void> asyncRecoverTask =
+ recoverFilesInPartition(
+ partitionFiles.getKey(),
+ dataRegionRecoveryContext,
+ partitionFiles.getValue(),
+ true);
+ if (asyncRecoverTask != null) {
+ asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
+ }
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpUnseqTsFiles.entrySet()) {
- recoverFilesInPartition(
- partitionFiles.getKey(), dataRegionRecoveryContext,
partitionFiles.getValue(), false);
+ Callable<Void> asyncRecoverTask =
+ recoverFilesInPartition(
+ partitionFiles.getKey(),
+ dataRegionRecoveryContext,
+ partitionFiles.getValue(),
+ false);
+ if (asyncRecoverTask != null) {
+ asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
+ }
}
if (config.isEnableSeparateData()) {
TimePartitionManager.getInstance()
@@ -607,16 +616,25 @@ public class DataRegion implements IDataRegionForQuery {
throw new DataRegionException(e);
}
- initCompactionSchedule();
+ if (asyncTsFileResourceRecoverTaskList.isEmpty()) {
+ initCompactionSchedule();
+ }
- if (StorageEngine.getInstance().isAllSgReady()) {
+ if (StorageEngine.getInstance().isReadyForReadAndWrite()) {
logger.info("The data region {}[{}] is created successfully",
databaseName, dataRegionId);
} else {
logger.info("The data region {}[{}] is recovered successfully",
databaseName, dataRegionId);
}
}
- private void updateLastFlushTime(TsFileResource resource, boolean isSeq) {
+ private void updatePartitionLastFlushTime(TsFileResource resource) {
+ if (config.isEnableSeparateData()) {
+ lastFlushTimeMap.updatePartitionFlushedTime(
+ resource.getTimePartition(),
resource.getTimeIndex().getMaxEndTime());
+ }
+ }
+
+ protected void updateDeviceLastFlushTime(TsFileResource resource) {
long timePartitionId = resource.getTimePartition();
Map<IDeviceID, Long> endTimeMap = new HashMap<>();
for (IDeviceID deviceId : resource.getDevices()) {
@@ -631,6 +649,23 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ protected void upgradeAndUpdateDeviceLastFlushTime(
+ long timePartitionId, List<TsFileResource> resources) {
+ Map<IDeviceID, Long> endTimeMap = new HashMap<>();
+ for (TsFileResource resource : resources) {
+ for (IDeviceID deviceId : resource.getDevices()) {
+ long endTime = resource.getEndTime(deviceId);
+ endTimeMap.put(deviceId, endTime);
+ }
+ }
+ if (config.isEnableSeparateData()) {
+ lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId,
endTimeMap);
+ }
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
+ }
+ }
+
public void initCompactionSchedule() {
if (!config.isEnableSeqSpaceCompaction()
&& !config.isEnableUnseqSpaceCompaction()
@@ -654,7 +689,7 @@ public class DataRegion implements IDataRegionForQuery {
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- private List<TsFileResource> getAllFiles(List<String> folders)
+ private Map<Long, List<TsFileResource>> getAllFiles(List<String> folders)
throws IOException, DataRegionException {
// "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files
in one time partition
Map<String, File> tsFilePartitionPath2File = new HashMap<>();
@@ -692,10 +727,12 @@ public class DataRegion implements IDataRegionForQuery {
sortedFiles.sort(this::compareFileName);
long currentTime = System.currentTimeMillis();
- List<TsFileResource> ret = new ArrayList<>();
+ Map<Long, List<TsFileResource>> ret = new TreeMap<>();
for (File f : sortedFiles) {
checkTsFileTime(f, currentTime);
- ret.add(new TsFileResource(f));
+ TsFileResource resource = new TsFileResource(f);
+ ret.computeIfAbsent(resource.getTsFileID().timePartitionId, l -> new
ArrayList<>())
+ .add(resource);
}
return ret;
}
@@ -751,7 +788,7 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResource.remove();
return;
}
- updateLastFlushTime(tsFileResource, isSeq);
+ updateDeviceLastFlushTime(tsFileResource);
tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
FileMetrics.getInstance()
.addTsFile(
@@ -829,7 +866,102 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private void recoverFilesInPartition(
+ private Callable<Void> recoverFilesInPartition(
+ long partitionId,
+ DataRegionRecoveryContext context,
+ List<TsFileResource> resourceList,
+ boolean isSeq) {
+
+ File partitionSysDir =
+ SystemFileFactory.INSTANCE.getFile(dataRegionSysDir,
String.valueOf(partitionId));
+ File logFile = SystemFileFactory.INSTANCE.getFile(partitionSysDir,
"FileTimeIndexCache_0");
+ if (logFile.exists()) {
+ Map<TsFileID, FileTimeIndex> fileTimeIndexMap;
+ try {
+ FileTimeIndexCacheReader logReader =
+ new FileTimeIndexCacheReader(logFile, dataRegionId, partitionId);
+ fileTimeIndexMap = logReader.read();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
+ List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
+ Callable<Void> asyncRecoverTask = null;
+ for (TsFileResource tsFileResource : resourceList) {
+ if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
+
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+ tsFileManager.add(tsFileResource, isSeq);
+ resourceListForAsyncRecover.add(tsFileResource);
+ } else {
+ resourceListForSyncRecover.add(tsFileResource);
+ }
+ }
+ if (!resourceListForAsyncRecover.isEmpty()) {
+ asyncRecoverTask =
+ asyncRecoverFilesInPartition(partitionId, context,
resourceListForAsyncRecover, isSeq);
+ }
+ if (!resourceListForSyncRecover.isEmpty()) {
+ syncRecoverFilesInPartition(partitionId, context,
resourceListForSyncRecover, isSeq);
+ }
+ return asyncRecoverTask;
+ } else {
+ syncRecoverFilesInPartition(partitionId, context, resourceList, isSeq);
+ return null;
+ }
+ }
+
+ private Callable<Void> asyncRecoverFilesInPartition(
+ long partitionId,
+ DataRegionRecoveryContext context,
+ List<TsFileResource> resourceList,
+ boolean isSeq) {
+ if (config.isEnableSeparateData()) {
+ if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId,
false)) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.parseInt(dataRegionId)),
+ partitionId,
+ false,
+ Long.MAX_VALUE,
+ lastFlushTimeMap.getMemSize(partitionId)));
+ }
+ for (TsFileResource tsFileResource : resourceList) {
+ updatePartitionLastFlushTime(tsFileResource);
+ }
+ TimePartitionManager.getInstance()
+ .updateAfterFlushing(
+ new DataRegionId(Integer.parseInt(dataRegionId)),
+ partitionId,
+ System.currentTimeMillis(),
+ lastFlushTimeMap.getMemSize(partitionId),
+ false);
+ }
+ return () -> {
+ for (TsFileResource tsFileResource : resourceList) {
+ try (SealedTsFileRecoverPerformer recoverPerformer =
+ new SealedTsFileRecoverPerformer(tsFileResource)) {
+ recoverPerformer.recover();
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ } catch (Throwable e) {
+ logger.error(
+ "Fail to recover sealed TsFile {}, skip it.",
tsFileResource.getTsFilePath(), e);
+ } finally {
+ // update recovery context
+ context.incrementRecoveredFilesNum();
+ }
+ }
+ // After recover, replace partition last flush time with device last
flush time
+ if (config.isEnableSeparateData()) {
+ upgradeAndUpdateDeviceLastFlushTime(partitionId, resourceList);
+ }
+
+ return null;
+ };
+ }
+
+ private void syncRecoverFilesInPartition(
long partitionId,
DataRegionRecoveryContext context,
List<TsFileResource> resourceList,
@@ -837,8 +969,10 @@ public class DataRegion implements IDataRegionForQuery {
for (TsFileResource tsFileResource : resourceList) {
recoverSealedTsFiles(tsFileResource, context, isSeq);
}
+ FileTimeIndexCacheRecorder.getInstance()
+ .logFileTimeIndex(resourceList.toArray(new TsFileResource[0]));
if (config.isEnableSeparateData()) {
- if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId)) {
+ if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId,
true)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -849,7 +983,7 @@ public class DataRegion implements IDataRegionForQuery {
lastFlushTimeMap.getMemSize(partitionId)));
}
for (TsFileResource tsFileResource : resourceList) {
- updateLastFlushTime(tsFileResource, isSeq);
+ updateDeviceLastFlushTime(tsFileResource);
}
TimePartitionManager.getInstance()
.updateAfterFlushing(
@@ -1061,7 +1195,6 @@ public class DataRegion implements IDataRegionForQuery {
insertTabletNode.checkTTL(
results, i ->
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getDeviceID(i)));
noFailure = loc == 0;
-
noFailure = noFailure && splitAndInsert(insertTabletNode, loc, results);
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
@@ -1084,7 +1217,7 @@ public class DataRegion implements IDataRegionForQuery {
private void initFlushTimeMap(long timePartitionId) {
if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -1657,6 +1790,8 @@ public class DataRegion implements IDataRegionForQuery {
"{} will close all files for deleting data folder {}",
databaseName + "-" + dataRegionId,
systemDir);
+ FileTimeIndexCacheRecorder.getInstance()
+ .removeFileTimeIndexCache(Integer.parseInt(dataRegionId));
writeLock("deleteFolder");
try {
File dataRegionSystemFolder =
@@ -2867,7 +3002,7 @@ public class DataRegion implements IDataRegionForQuery {
if (config.isEnableSeparateData()) {
final DataRegionId dataRegionId = new
DataRegionId(Integer.parseInt(this.dataRegionId));
final long timePartitionId = newTsFileResource.getTimePartition();
- if
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+ if
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -2877,7 +3012,7 @@ public class DataRegion implements IDataRegionForQuery {
Long.MAX_VALUE,
lastFlushTimeMap.getMemSize(timePartitionId)));
}
- updateLastFlushTime(newTsFileResource);
+ updateDeviceLastFlushTime(newTsFileResource);
TimePartitionManager.getInstance()
.updateAfterFlushing(
dataRegionId,
@@ -3003,23 +3138,6 @@ public class DataRegion implements IDataRegionForQuery {
return version;
}
- /**
- * Update latest time in latestTimeForEachDevice and
- * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load
external tsfile module.
- */
- protected void updateLastFlushTime(TsFileResource newTsFileResource) {
- for (IDeviceID device : newTsFileResource.getDevices()) {
- long endTime = newTsFileResource.getEndTime(device);
- long timePartitionId = TimePartitionUtils.getTimePartitionId(endTime);
- if (config.isEnableSeparateData()) {
- lastFlushTimeMap.updateOneDeviceFlushedTime(timePartitionId, device,
endTime);
- }
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- lastFlushTimeMap.updateOneDeviceGlobalFlushedTime(device, endTime);
- }
- }
- }
-
/**
* Execute the loading process by the type.
*
@@ -3391,7 +3509,7 @@ public class DataRegion implements IDataRegionForQuery {
long timePartitionId =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -3510,7 +3628,7 @@ public class DataRegion implements IDataRegionForQuery {
timePartitionIds[i] =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionIds[i])) {
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionIds[i],
true)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -3618,6 +3736,10 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ public File getDataRegionSysDir() {
+ return dataRegionSysDir;
+ }
+
public void addSettleFilesToList(
List<TsFileResource> seqResourcesToBeSettled,
List<TsFileResource> unseqResourcesToBeSettled,
@@ -3796,6 +3918,12 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ public void compactFileTimeIndexCache() {
+ for (long timePartition : partitionMaxFileVersions.keySet()) {
+ tsFileManager.compactFileTimeIndexCache(timePartition);
+ }
+ }
+
@TestOnly
public ILastFlushTimeMap getLastFlushTimeMap() {
return lastFlushTimeMap;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
index fe62176469b..f02044b0414 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
@@ -49,4 +49,8 @@ public class DeviceLastFlushTime implements ILastFlushTime {
}
return new PartitionLastFlushTime(maxTime);
}
+
+ Map<IDeviceID, Long> getDeviceLastFlushTimeMap() {
+ return deviceLastFlushTimeMap;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 4060eb97bb2..3f0abfd3e24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,22 +61,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
/** record memory cost of map for each partitionId */
private final Map<Long, Long> memCostForEachPartition = new
ConcurrentHashMap<>();
- // For load
- @Override
- public void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID
deviceId, long time) {
- ILastFlushTime flushTimeMapForPartition =
- partitionLatestFlushedTime.computeIfAbsent(
- timePartitionId, id -> new DeviceLastFlushTime());
- long lastFlushTime = flushTimeMapForPartition.getLastFlushTime(deviceId);
- if (lastFlushTime == Long.MIN_VALUE) {
- long memCost = HASHMAP_NODE_BASIC_SIZE + deviceId.ramBytesUsed();
- memCostForEachPartition.compute(
- timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
- }
- flushTimeMapForPartition.updateLastFlushTime(deviceId, time);
- }
-
- // For recover
+ // For sync recover resource without fileTimeIndexCache and load
@Override
public void updateMultiDeviceFlushedTime(
long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
@@ -82,7 +69,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
partitionLatestFlushedTime.computeIfAbsent(
timePartitionId, id -> new DeviceLastFlushTime());
- long memIncr = 0;
+ long memIncr = 0L;
for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
@@ -94,10 +81,60 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
}
+ // For async recover resource with fileTimeIndexCache
@Override
- public void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time) {
- globalLatestFlushedTimeForEachDevice.compute(
- path, (k, v) -> v == null ? time : Math.max(v, time));
+ public void upgradeAndUpdateMultiDeviceFlushedTime(
+ long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new DeviceLastFlushTime());
+ // upgrade DeviceLastFlushTime to PartitionLastFlushTime
+ if (flushTimeMapForPartition instanceof PartitionLastFlushTime) {
+ long maxFlushTime = flushTimeMapForPartition.getLastFlushTime(null);
+ ILastFlushTime newDeviceLastFlushTime = new DeviceLastFlushTime();
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ newDeviceLastFlushTime.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ maxFlushTime = Math.max(maxFlushTime, entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ } else {
+ // go here when DeviceLastFlushTime was recovered by wal recovery
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ }
+ flushTimeMapForPartition.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ }
+ }
+
+ // For fileTimeIndexCache recovered before the async resource recover start
+ @Override
+ public void updatePartitionFlushedTime(long timePartitionId, long
maxFlushedTime) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new PartitionLastFlushTime(maxFlushedTime));
+
+ if (flushTimeMapForPartition instanceof PartitionLastFlushTime) {
+ long memIncr = Long.BYTES;
+ flushTimeMapForPartition.updateLastFlushTime(null, maxFlushedTime);
+ memCostForEachPartition.putIfAbsent(timePartitionId, memIncr);
+ } else {
+ // go here when DeviceLastFlushTime was recovered by wal recovery
+ DeviceLastFlushTime deviceLastFlushTime = (DeviceLastFlushTime)
flushTimeMapForPartition;
+ Map<IDeviceID, Long> flushedTimeMap =
deviceLastFlushTime.getDeviceLastFlushTimeMap();
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ flushTimeMapForPartition.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ }
+ }
}
@Override
@@ -108,9 +145,14 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
}
@Override
- public boolean checkAndCreateFlushedTimePartition(long timePartitionId) {
+ public boolean checkAndCreateFlushedTimePartition(
+ long timePartitionId, boolean usingDeviceFlushTime) {
if (!partitionLatestFlushedTime.containsKey(timePartitionId)) {
- partitionLatestFlushedTime.put(timePartitionId, new
DeviceLastFlushTime());
+ partitionLatestFlushedTime.put(
+ timePartitionId,
+ usingDeviceFlushTime
+ ? new DeviceLastFlushTime()
+ : new PartitionLastFlushTime(Long.MIN_VALUE));
return false;
}
return true;
@@ -135,8 +177,14 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
return
partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
}
+ // This method is for creating last cache entry when insert
@Override
public long getGlobalFlushedTime(IDeviceID path) {
+ // If TsFileResource is not fully recovered, we should return
Long.MAX_VALUE
+ // to avoid create Last cache entry
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+ return Long.MAX_VALUE;
+ }
return globalLatestFlushedTimeForEachDevice.getOrDefault(path,
Long.MIN_VALUE);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index e809730cc0e..7bdd141bf6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -27,23 +27,22 @@ import java.util.Map;
public interface ILastFlushTimeMap {
// region update
- /** Update partitionLatestFlushedTime. */
- void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID deviceId,
long time);
-
void updateMultiDeviceFlushedTime(long timePartitionId, Map<IDeviceID, Long>
flushedTimeMap);
- /** Update globalLatestFlushedTimeForEachDevice. */
- void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time);
+ void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap);
+ void upgradeAndUpdateMultiDeviceFlushedTime(
+ long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
+
/** Update both partitionLatestFlushedTime and
globalLatestFlushedTimeForEachDevice. */
void updateLatestFlushTime(long partitionId, Map<IDeviceID, Long> updateMap);
// endregion
// region ensure
- boolean checkAndCreateFlushedTimePartition(long timePartitionId);
+ boolean checkAndCreateFlushedTimePartition(long timePartitionId, boolean
usingDeviceFlushTime);
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index 3469a0a6dec..17ad0dd4334 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -53,7 +53,7 @@ public class CompactionScheduleTaskWorker implements
Callable<Void> {
while (true) {
try {
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getCompactionScheduleIntervalInMs());
- if (!StorageEngine.getInstance().isAllSgReady()) {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
continue;
}
List<DataRegion> dataRegionListSnapshot = new
ArrayList<>(dataRegionList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index f7341876ec5..d7757bb3ff9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -53,7 +53,7 @@ public class TTLScheduleTask implements Callable<Void> {
while (true) {
try {
Thread.sleep(ttlCheckInterval);
- if (!StorageEngine.getInstance().isAllSgReady()) {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
continue;
}
List<DataRegion> dataRegionListSnapshot = new
ArrayList<>(dataRegionList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index b7ee09aec1a..b4ceea6dfda 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -65,6 +65,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskAlign
import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskChunkHandleImpl;
import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.UnclosedFileScanHandleImpl;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
@@ -1708,6 +1709,7 @@ public class TsFileProcessor {
}
writer.endFile();
tsFileResource.serialize();
+ FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource);
if (logger.isDebugEnabled()) {
logger.debug("Ended file {}", tsFileResource);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
index e3bb6adc748..c9656382e3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
+import java.util.Objects;
+
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.tsfile.utils.FilePathUtils.splitTsFilePath;
@@ -104,6 +106,27 @@ public class TsFileID {
return versionArray;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TsFileID that = (TsFileID) o;
+ return regionId == that.regionId
+ && timePartitionId == that.timePartitionId
+ && timestamp == that.timestamp
+ && fileVersion == that.fileVersion
+ && compactionVersion == that.compactionVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(regionId, timePartitionId, timestamp, fileVersion,
compactionVersion);
+ }
+
public long getTimestamp() {
return timestamp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index dc242858bce..06409cf3cf8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import org.apache.tsfile.read.filter.basic.Filter;
@@ -39,7 +40,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TsFileManager {
private final String storageGroupName;
private String dataRegionId;
- private final String storageGroupDir;
+ private final String dataRegionSysDir;
/** Serialize queries, delete resource files, compaction cleanup files */
private final ReadWriteLock resourceListLock = new ReentrantReadWriteLock();
@@ -52,9 +53,9 @@ public class TsFileManager {
private volatile boolean allowCompaction = true;
private final AtomicLong currentCompactionTaskSerialId = new AtomicLong(0);
- public TsFileManager(String storageGroupName, String dataRegionId, String
storageGroupDir) {
+ public TsFileManager(String storageGroupName, String dataRegionId, String
dataRegionSysDir) {
this.storageGroupName = storageGroupName;
- this.storageGroupDir = storageGroupDir;
+ this.dataRegionSysDir = dataRegionSysDir;
this.dataRegionId = dataRegionId;
}
@@ -260,6 +261,7 @@ public class TsFileManager {
.computeIfAbsent(timePartition, t -> new TsFileResourceList())
.keepOrderInsert(resource);
}
+ FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(resource);
}
}
} finally {
@@ -339,8 +341,8 @@ public class TsFileManager {
return storageGroupName;
}
- public String getStorageGroupDir() {
- return storageGroupDir;
+ public String getDataRegionSysDir() {
+ return dataRegionSysDir;
}
public Set<Long> getTimePartitions() {
@@ -389,4 +391,19 @@ public class TsFileManager {
return (sequenceFiles.higherKey(timePartitionId) == null
&& unsequenceFiles.higherKey(timePartitionId) == null);
}
+
+ public void compactFileTimeIndexCache(long timePartition) {
+ readLock();
+ try {
+ FileTimeIndexCacheRecorder.getInstance()
+ .compactFileTimeIndexIfNeeded(
+ storageGroupName,
+ Integer.parseInt(dataRegionId),
+ timePartition,
+ sequenceFiles.get(timePartition),
+ unsequenceFiles.get(timePartition));
+ } finally {
+ readUnlock();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index e67364b732e..97dcada9984 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -62,6 +62,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -294,6 +295,20 @@ public class TsFileResource {
}
}
+ public static int getFileTimeIndexSerializedSize() {
+ // 5 * 8 Byte means 5 long numbers of tsFileID.timestamp,
tsFileID.fileVersion
+ // tsFileID.compactionVersion, timeIndex.getMinStartTime(),
timeIndex.getMaxStartTime()
+ return 5 * Long.BYTES;
+ }
+
+ public void serializeFileTimeIndexToByteBuffer(ByteBuffer buffer) {
+ buffer.putLong(tsFileID.timestamp);
+ buffer.putLong(tsFileID.fileVersion);
+ buffer.putLong(tsFileID.compactionVersion);
+ buffer.putLong(timeIndex.getMinStartTime());
+ buffer.putLong(timeIndex.getMaxEndTime());
+ }
+
public void updateStartTime(IDeviceID device, long time) {
timeIndex.updateStartTime(device, time);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
new file mode 100644
index 00000000000..b0c805bba6b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent;
+import static
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize;
+
+public class FileTimeIndexCacheRecorder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTimeIndexCacheRecorder.class);
+
+ private static final int VERSION = 0;
+
+ protected static final String FILE_NAME = "FileTimeIndexCache_" + VERSION;
+
+ private final ScheduledExecutorService recordFileIndexThread;
+
+ private final BlockingQueue<Runnable> taskQueue = new
LinkedBlockingQueue<>();
+
+ private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
+ new ConcurrentHashMap<>();
+
+ private FileTimeIndexCacheRecorder() {
+ recordFileIndexThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.FILE_TIME_INDEX_RECORD.getName());
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ recordFileIndexThread, this::executeTasks, 100, 100,
TimeUnit.MILLISECONDS);
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ recordFileIndexThread,
+ StorageEngine.getInstance().executeCompactFileTimeIndexCache(),
+ 120_000L,
+ 120_000L,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void executeTasks() {
+ Runnable task;
+ while ((task = taskQueue.poll()) != null) {
+ recordFileIndexThread.submit(task);
+ }
+ }
+
+ public void logFileTimeIndex(TsFileResource... tsFileResources) {
+ if (tsFileResources != null && tsFileResources.length > 0) {
+ TsFileResource firstResource = tsFileResources[0];
+ TsFileID tsFileID = firstResource.getTsFileID();
+ int dataRegionId = tsFileID.regionId;
+ long partitionId = tsFileID.timePartitionId;
+ File dataRegionSysDir =
+ StorageEngine.getDataRegionSystemDir(
+ firstResource.getDatabaseName(),
firstResource.getDataRegionId());
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ getFileTimeIndexSerializedSize() *
tsFileResources.length);
+ for (TsFileResource tsFileResource : tsFileResources) {
+ tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache: {}",
e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache");
+ }
+ }
+ }
+
+ public void compactFileTimeIndexIfNeeded(
+ String dataBaseName,
+ int dataRegionId,
+ long partitionId,
+ TsFileResourceList sequenceFiles,
+ TsFileResourceList unsequenceFiles) {
+ FileTimeIndexCacheWriter writer =
+ getWriter(
+ dataRegionId,
+ partitionId,
+ StorageEngine.getDataRegionSystemDir(dataBaseName,
String.valueOf(dataRegionId)));
+
+ int currentResourceCount =
+ (sequenceFiles == null ? 0 : sequenceFiles.size())
+ + (unsequenceFiles == null ? 0 : unsequenceFiles.size());
+ if (writer.getLogFile().length()
+ > currentResourceCount * getFileTimeIndexSerializedSize() * 100L) {
+
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ writer.clearFile();
+ if (sequenceFiles != null && !sequenceFiles.isEmpty()) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ getFileTimeIndexSerializedSize() *
sequenceFiles.size());
+ for (TsFileResource tsFileResource : sequenceFiles) {
+
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ }
+ if (unsequenceFiles != null && !unsequenceFiles.isEmpty()) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ getFileTimeIndexSerializedSize() *
unsequenceFiles.size());
+ for (TsFileResource tsFileResource : unsequenceFiles) {
+
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when compact FileTimeIndexCache:
{}", e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when compact FileTimeIndexCache");
+ }
+ }
+ }
+
+ private FileTimeIndexCacheWriter getWriter(
+ int dataRegionId, long partitionId, File dataRegionSysDir) {
+ return writerMap
+ .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(
+ partitionId,
+ k -> {
+ File partitionDir =
+ SystemFileFactory.INSTANCE.getFile(dataRegionSysDir,
String.valueOf(partitionId));
+ File logFile = SystemFileFactory.INSTANCE.getFile(partitionDir,
FILE_NAME);
+ try {
+ if (!partitionDir.exists() && !partitionDir.mkdirs()) {
+ LOGGER.debug(
+ "Partition directory has existed,filePath:{}",
+ partitionDir.getAbsolutePath());
+ }
+ if (!logFile.createNewFile()) {
+ LOGGER.debug(
+ "Partition log file has existed,filePath:{}",
logFile.getAbsolutePath());
+ }
+ return new FileTimeIndexCacheWriter(logFile, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public void close() throws IOException {
+ for (Map<Long, FileTimeIndexCacheWriter> partitionWriterMap :
writerMap.values()) {
+ for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
+ writer.close();
+ }
+ }
+ }
+
+ public void removeFileTimeIndexCache(int dataRegionId) {
+ Map<Long, FileTimeIndexCacheWriter> partitionWriterMap =
writerMap.get(dataRegionId);
+ if (partitionWriterMap != null) {
+ for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
+ try {
+ writer.close();
+ deleteDirectoryAndEmptyParent(writer.getLogFile());
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when close FileTimeIndexCache: {}",
e.getMessage());
+ }
+ }
+ }
+ }
+
+ public static FileTimeIndexCacheRecorder getInstance() {
+ return FileTimeIndexCacheRecorder.InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
+
+ private static final FileTimeIndexCacheRecorder INSTANCE = new
FileTimeIndexCacheRecorder();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
index 7d2fcb8dd1f..b962a447b06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -53,11 +55,14 @@ public class PlainDeviceTimeIndex extends
ArrayDeviceTimeIndex implements ITimeI
}
for (int i = 0; i < deviceNum; i++) {
- String path =
- DataNodeDevicePathCache.getInstance()
- .getDeviceId(ReadWriteIOUtils.readString(inputStream));
+ String path = ReadWriteIOUtils.readString(inputStream);
int index = ReadWriteIOUtils.readInt(inputStream);
- deviceToIndex.put(IDeviceID.Factory.DEFAULT_FACTORY.create(path), index);
+ try {
+ PartialPath partialPath =
DataNodeDevicePathCache.getInstance().getPartialPath(path);
+ deviceToIndex.put(partialPath.getIDeviceID(), index);
+ } catch (IllegalPathException e) {
+ deviceToIndex.put(IDeviceID.Factory.DEFAULT_FACTORY.create(path),
index);
+ }
}
return this;
}
@@ -76,10 +81,14 @@ public class PlainDeviceTimeIndex extends
ArrayDeviceTimeIndex implements ITimeI
}
for (int i = 0; i < deviceNum; i++) {
- String path =
-
DataNodeDevicePathCache.getInstance().getDeviceId(ReadWriteIOUtils.readString(buffer));
+ String path = ReadWriteIOUtils.readString(buffer);
int index = buffer.getInt();
- deviceToIndex.put(IDeviceID.Factory.DEFAULT_FACTORY.create(path), index);
+ try {
+ PartialPath partialPath =
DataNodeDevicePathCache.getInstance().getPartialPath(path);
+ deviceToIndex.put(partialPath.getIDeviceID(), index);
+ } catch (IllegalPathException e) {
+ deviceToIndex.put(IDeviceID.Factory.DEFAULT_FACTORY.create(path),
index);
+ }
}
return this;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
new file mode 100644
index 00000000000..35682bd8020
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache;
+
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize;
+
+public class FileTimeIndexCacheReader {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTimeIndexCacheReader.class);
+
+ private final File logFile;
+ private final long fileLength;
+ private final int dataRegionId;
+ private final long partitionId;
+
+ public FileTimeIndexCacheReader(File logFile, String dataRegionId, long
partitionId) {
+ this.logFile = logFile;
+ this.fileLength = logFile.length();
+ this.dataRegionId = Integer.parseInt(dataRegionId);
+ this.partitionId = partitionId;
+ }
+
+ public Map<TsFileID, FileTimeIndex> read() throws IOException {
+ Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
+ long readLength = 0L;
+ try (DataInputStream logStream =
+ new DataInputStream(new
BufferedInputStream(Files.newInputStream(logFile.toPath())))) {
+ while (readLength < fileLength) {
+ long timestamp = logStream.readLong();
+ long fileVersion = logStream.readLong();
+ long compactionVersion = logStream.readLong();
+ long minStartTime = logStream.readLong();
+ long maxEndTime = logStream.readLong();
+ TsFileID tsFileID =
+ new TsFileID(dataRegionId, partitionId, timestamp, fileVersion,
compactionVersion);
+ FileTimeIndex fileTimeIndex = new FileTimeIndex(minStartTime,
maxEndTime);
+ fileTimeIndexMap.put(tsFileID, fileTimeIndex);
+ readLength += getFileTimeIndexSerializedSize();
+ }
+ } catch (IOException ignored) {
+ // the error can be ignored
+ }
+ if (readLength != fileLength) {
+ // if the file is complete, we can truncate the file
+ try (FileChannel channel = FileChannel.open(logFile.toPath(),
StandardOpenOption.WRITE)) {
+ channel.truncate(readLength);
+ }
+ }
+ return fileTimeIndexMap;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
new file mode 100644
index 00000000000..dce4f8b1103
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache;
+
+import org.apache.iotdb.db.utils.writelog.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+
+public class FileTimeIndexCacheWriter implements ILogWriter {
+ private static final Logger logger =
LoggerFactory.getLogger(FileTimeIndexCacheWriter.class);
+
+ private final File logFile;
+ private FileOutputStream fileOutputStream;
+ private FileChannel channel;
+ private final boolean forceEachWrite;
+
+ public FileTimeIndexCacheWriter(File logFile, boolean forceEachWrite)
+ throws FileNotFoundException {
+ this.logFile = logFile;
+ this.forceEachWrite = forceEachWrite;
+
+ fileOutputStream = new FileOutputStream(logFile, true);
+ channel = fileOutputStream.getChannel();
+ }
+
+ @Override
+ public void write(ByteBuffer logBuffer) throws IOException {
+
+ try {
+ channel.write(logBuffer);
+ if (this.forceEachWrite) {
+ channel.force(true);
+ }
+ } catch (ClosedChannelException ignored) {
+ logger.warn("someone interrupt current thread, so no need to do write
for io safety");
+ }
+ }
+
+ @Override
+ public void force() throws IOException {
+ if (channel != null && channel.isOpen()) {
+ channel.force(true);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (channel != null) {
+ if (channel.isOpen()) {
+ channel.force(false);
+ }
+ fileOutputStream.close();
+ fileOutputStream = null;
+ channel.close();
+ channel = null;
+ }
+ }
+
+ public void clearFile() throws IOException {
+ close();
+ Files.delete(this.logFile.toPath());
+ if (!logFile.createNewFile()) {
+ logger.warn("Partition log file has existed,filePath:{}",
logFile.getAbsolutePath());
+ }
+ fileOutputStream = new FileOutputStream(logFile, true);
+ channel = fileOutputStream.getChannel();
+ }
+
+ @Override
+ public String toString() {
+ return "LogWriter{" + "logFile=" + logFile + '}';
+ }
+
+ public File getLogFile() {
+ return logFile;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index a09c473e16e..36ae9752624 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
@@ -287,6 +288,7 @@ public class UnsealedTsFileRecoverPerformer extends
AbstractTsFileRecoverPerform
// currently, we close this file anyway
writer.endFile();
tsFileResource.serialize();
+
FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource);
} catch (IOException | ExecutionException e) {
throw new WALRecoverException(e);
} catch (InterruptedException e) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
index c8bafe8f9b3..eb40f66a6d0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
@@ -222,7 +222,7 @@ public class LastFlushTimeMapTest {
unseqResource1.setFile(unseqResourceFile1);
unseqResource1.updateStartTime(device, 1);
unseqResource1.updateEndTime(device, 100);
- dataRegion.updateLastFlushTime(unseqResource1);
+ dataRegion.updateDeviceLastFlushTime(unseqResource1);
File unseqResourceFile2 = new File(unseqDirPath + File.separator +
"5-5-0-0.tsfile.resource");
TsFileResource unseqResource2 = new TsFileResource();
@@ -230,7 +230,7 @@ public class LastFlushTimeMapTest {
unseqResource2.setFile(unseqResourceFile2);
unseqResource2.updateStartTime(device, 1);
unseqResource2.updateEndTime(device, 10);
- dataRegion.updateLastFlushTime(unseqResource2);
+ dataRegion.updateDeviceLastFlushTime(unseqResource2);
File unseqResourceFile3 = new File(unseqDirPath + File.separator +
"6-6-0-0.tsfile.resource");
TsFileResource unseqResource3 = new TsFileResource();
@@ -238,7 +238,7 @@ public class LastFlushTimeMapTest {
unseqResource3.setFile(unseqResourceFile3);
unseqResource3.updateStartTime(device, 1);
unseqResource3.updateEndTime(device, 70);
- dataRegion.updateLastFlushTime(unseqResource3);
+ dataRegion.updateDeviceLastFlushTime(unseqResource3);
Assert.assertEquals(100,
dataRegion.getLastFlushTimeMap().getFlushedTime(0, device));
dataRegion.getLastFlushTimeMap().degradeLastFlushTime(0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 04443ba99fe..1f160e07758 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
@@ -224,6 +225,7 @@ public class EnvironmentUtils {
for (String path : tierManager.getAllLocalUnSequenceFileFolders()) {
cleanDir(path);
}
+ FileTimeIndexCacheRecorder.getInstance().close();
// delete system info
cleanDir(config.getSystemDir());
// delete query
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e731f50628c..102022417e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -183,6 +183,8 @@ public enum ThreadName {
REGION_MIGRATE("Region-Migrate-Pool"),
STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
REPAIR_DATA("RepairData"),
+ FILE_TIME_INDEX_RECORD("FileTimeIndexRecord"),
+
// the unknown thread name is used for metrics
UNKOWN("UNKNOWN");