This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new d38ea579261 Load: Add metrics for active load (#13281)
d38ea579261 is described below
commit d38ea57926159f715d0fd1a9fb5b25ff0cdba5e8
Author: YC27 <[email protected]>
AuthorDate: Mon Aug 26 19:23:38 2024 +0800
Load: Add metrics for active load (#13281)
Co-authored-by: Steve Yurong Su <[email protected]>
(cherry picked from commit efd3420674ec77cf9734b10c3af41d411770f326)
---
.../pipeconsensus/PipeConsensusReceiver.java | 2 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 4 +-
.../plan/analyze/LoadTsFileAnalyzer.java | 4 +-
.../plan/node/load/LoadTsFilePieceNode.java | 2 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 12 +--
.../db/service/metrics/DataNodeMetricsHelper.java | 6 +-
.../iotdb/db/storageengine/StorageEngine.java | 4 +-
.../db/storageengine/dataregion/DataRegion.java | 2 +-
.../load/LoadTsFileManager.java | 17 ++--
.../storageengine/load/active/ActiveLoadAgent.java | 51 ++++++++++
.../load/active/ActiveLoadDirScanner.java | 77 +++++++-------
.../load/active/ActiveLoadMetricsCollector.java | 96 +++++++++++++++++
.../load/active/ActiveLoadPendingQueue.java | 11 +-
.../active/ActiveLoadScheduledExecutorService.java | 107 +++++++++++++++++++
.../load/active/ActiveLoadTsFileLoader.java | 28 ++++-
.../load/limiter/LoadTsFileRateLimiter.java | 4 +-
.../memory}/LoadTsFileAbstractMemoryBlock.java | 2 +-
.../LoadTsFileAnalyzeSchemaMemoryBlock.java | 4 +-
.../memory}/LoadTsFileDataCacheMemoryBlock.java | 2 +-
.../load/memory}/LoadTsFileMemoryManager.java | 2 +-
.../load/metrics/ActiveLoadingFilesMetricsSet.java | 113 +++++++++++++++++++++
.../load/metrics}/LoadTsFileCostMetricsSet.java | 2 +-
.../load/metrics}/LoadTsFileMemMetricSet.java | 4 +-
.../load/splitter/AlignedChunkData.java | 2 +-
.../splitter/BatchedAlignedValueChunkData.java | 2 +-
.../load/splitter/ChunkData.java | 2 +-
.../load/splitter/DeletionData.java | 2 +-
.../load/splitter/NonAlignedChunkData.java | 2 +-
.../load/splitter/TsFileData.java | 2 +-
.../load/splitter/TsFileSplitter.java | 2 +-
.../BatchedCompactionWithTsFileSplitterTest.java | 6 +-
.../iotdb/commons/concurrent/ThreadName.java | 2 +
.../iotdb/commons/service/metric/enums/Metric.java | 1 +
33 files changed, 488 insertions(+), 91 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index e514a913aaa..6b17faf6524 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -49,13 +49,13 @@ import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.PipeConsensusReceiverMetrics;
import
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionPointCounter;
-import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.rpc.RpcUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 86f17c65331..605fb4c95dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -60,7 +60,6 @@ import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
import
org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -146,6 +145,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -187,7 +187,6 @@ import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
-import static
org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet.ANALYSIS;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
@@ -202,6 +201,7 @@ import static
org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils.const
import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
import static
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.GetSourcePathsVisitor.getSourcePaths;
+import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME_HEADER;
/** This visitor is used to analyze each type of Statement and returns the
{@link Analysis}. */
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 781d5a3fe5c..021c320990f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -48,8 +48,6 @@ import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileAnalyzeSchemaMemoryBlock;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
@@ -61,6 +59,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseState
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
+import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.RpcUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index b8dd843e27c..5d4f02378fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -21,12 +21,12 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.exception.write.PageException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 68205c308fb..7faecd2708d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -45,12 +45,6 @@ import
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileSplitter;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -63,6 +57,12 @@ import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileDataCacheMemoryBlock;
+import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index e5e3f27f5c1..d9d86001b9a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -39,8 +39,9 @@ import
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileMemMetricSet;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.subscription.metric.SubscriptionMetrics;
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
@@ -97,6 +98,7 @@ public class DataNodeMetricsHelper {
// bind load related metrics
MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance());
+
MetricService.getInstance().addMetricSet(ActiveLoadingFilesMetricsSet.getInstance());
}
private static void initSystemMetrics() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 3d2440c2a77..1e4f2953fe9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -52,8 +52,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
-import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
-import
org.apache.iotdb.db.queryengine.execution.load.limiter.LoadTsFileRateLimiter;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
@@ -74,6 +72,8 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
+import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
+import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.rpc.RpcUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2165dc429d6..e30af047e2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -48,7 +48,6 @@ import
org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.common.DeviceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
-import
org.apache.iotdb.db.queryengine.execution.load.limiter.LoadTsFileRateLimiter;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
@@ -104,6 +103,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsF
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
+import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index db9db41cee3..fd21140453c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.storageengine.load;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -35,11 +35,6 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import
org.apache.iotdb.db.queryengine.execution.load.active.ActiveLoadDirScanner;
-import
org.apache.iotdb.db.queryengine.execution.load.active.ActiveLoadTsFileLoader;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.DeletionData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -47,6 +42,10 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
+import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
+import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -97,14 +96,12 @@ public class LoadTsFileManager {
private final Map<String, CleanupTask> uuid2CleanupTask = new
ConcurrentHashMap<>();
private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new
PriorityBlockingQueue<>();
- private final ActiveLoadTsFileLoader activeLoadTsFileLoader = new
ActiveLoadTsFileLoader();
- private final ActiveLoadDirScanner activeLoadDirScanner =
- new ActiveLoadDirScanner(activeLoadTsFileLoader);
+ private final ActiveLoadAgent activeLoadAgent = new ActiveLoadAgent();
public LoadTsFileManager() {
registerCleanupTaskExecutor();
recover();
- activeLoadDirScanner.start();
+ activeLoadAgent.start();
}
private void registerCleanupTaskExecutor() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
new file mode 100644
index 00000000000..f060bd9a96f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+public class ActiveLoadAgent {
+
+ private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
+ private final ActiveLoadDirScanner activeLoadDirScanner;
+ private final ActiveLoadMetricsCollector activeLoadMetricsCollector;
+
+ public ActiveLoadAgent() {
+ this.activeLoadTsFileLoader = new ActiveLoadTsFileLoader();
+ this.activeLoadDirScanner = new
ActiveLoadDirScanner(activeLoadTsFileLoader);
+ this.activeLoadMetricsCollector =
+ new ActiveLoadMetricsCollector(activeLoadTsFileLoader,
activeLoadDirScanner);
+ }
+
+ public ActiveLoadTsFileLoader loader() {
+ return activeLoadTsFileLoader;
+ }
+
+ public ActiveLoadDirScanner scanner() {
+ return activeLoadDirScanner;
+ }
+
+ public ActiveLoadMetricsCollector metrics() {
+ return activeLoadMetricsCollector;
+ }
+
+ public synchronized void start() {
+ activeLoadDirScanner.start();
+ activeLoadMetricsCollector.start();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
similarity index 74%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index e49789a8c0b..8c2a2787529 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -17,13 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.active;
+package org.apache.iotdb.db.storageengine.load.active;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
import org.apache.commons.io.FileUtils;
import org.apache.tsfile.common.conf.TSFileConfig;
@@ -33,60 +30,34 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
-public class ActiveLoadDirScanner {
+public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadDirScanner.class);
private static final String RESOURCE = ".resource";
private static final String MODS = ".mods";
- private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
-
- private static final ScheduledExecutorService DIR_SCAN_JOB_EXECUTOR =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.ACTIVE_LOAD_DIR_SCANNER.getName());
- private static final long MIN_SCAN_INTERVAL_SECONDS =
-
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningCheckIntervalSeconds();
-
private final AtomicReference<String[]> listeningDirsConfig = new
AtomicReference<>();
- private final Set<String> listeningDirs = new HashSet<>();
+ private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();
private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
- private Future<?> dirScanJobFuture;
-
public ActiveLoadDirScanner(final ActiveLoadTsFileLoader
activeLoadTsFileLoader) {
+ super(ThreadName.ACTIVE_LOAD_DIR_SCANNER);
this.activeLoadTsFileLoader = activeLoadTsFileLoader;
- }
-
- public synchronized void start() {
- if (dirScanJobFuture == null) {
- dirScanJobFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- DIR_SCAN_JOB_EXECUTOR,
- this::scanSafely,
- MIN_SCAN_INTERVAL_SECONDS,
- MIN_SCAN_INTERVAL_SECONDS,
- TimeUnit.SECONDS);
- LOGGER.info(
- "Active load dir scanner started. Scan interval: {}s.",
MIN_SCAN_INTERVAL_SECONDS);
- }
- }
- public synchronized void stop() {
- if (dirScanJobFuture != null) {
- dirScanJobFuture.cancel(false);
- dirScanJobFuture = null;
- LOGGER.info("Active load dir scanner stopped.");
- }
+ register(this::scanSafely);
+ LOGGER.info("Active load dir scanner periodical job registered");
}
private void scanSafely() {
@@ -172,4 +143,26 @@ public class ActiveLoadDirScanner {
: filePathWithResourceOrModsTail.substring(
0, filePathWithResourceOrModsTail.length() - MODS.length());
}
+
+ // Metrics
+ public long countAndReportActiveListeningDirsFileNumber() {
+ final long[] fileCount = {0};
+ try {
+ for (String dir : listeningDirs) {
+ Files.walkFileTree(
+ new File(dir).toPath(),
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs) {
+ fileCount[0]++;
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
ActiveLoadingFilesMetricsSet.getInstance().recordPendingFileCounter(fileCount[0]);
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to count active listening dirs file number.", e);
+ }
+ return fileCount[0];
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadMetricsCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadMetricsCollector.java
new file mode 100644
index 00000000000..de40378a105
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadMetricsCollector.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.commons.concurrent.ThreadName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ActiveLoadMetricsCollector extends
ActiveLoadScheduledExecutorService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadMetricsCollector.class);
+
+ private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
+ private final ActiveLoadDirScanner activeLoadDirScanner;
+
+ private long countPendingFileRemainingSkipRound = 0;
+ private long countFailedFileRemainingSkipRound = 0;
+
+ public ActiveLoadMetricsCollector(
+ final ActiveLoadTsFileLoader activeLoadTsFileLoader,
+ final ActiveLoadDirScanner activeLoadDirScanner) {
+ super(ThreadName.ACTIVE_LOAD_METRICS_COLLECTOR);
+
+ this.activeLoadTsFileLoader = activeLoadTsFileLoader;
+ this.activeLoadDirScanner = activeLoadDirScanner;
+
+ register(this::countAndReportPendingFile);
+ register(this::countAndReportFailedFile);
+ LOGGER.info("Active load metric collector periodical jobs registered");
+ }
+
+ private void countAndReportPendingFile() {
+ if (countPendingFileRemainingSkipRound > 0) {
+ --countPendingFileRemainingSkipRound;
+ return;
+ }
+
+ final long currentPendingFileNumber =
+ activeLoadDirScanner.countAndReportActiveListeningDirsFileNumber();
+
+ if (currentPendingFileNumber < 100) {
+ countPendingFileRemainingSkipRound = 6; // 30 seconds
+ return;
+ }
+ if (currentPendingFileNumber < 1000) {
+ countPendingFileRemainingSkipRound = 18; // 90 seconds
+ return;
+ }
+ if (currentPendingFileNumber < 10000) {
+ countPendingFileRemainingSkipRound = 120; // 600 seconds
+ return;
+ }
+ countPendingFileRemainingSkipRound = 180; // 900 seconds
+ }
+
+ private void countAndReportFailedFile() {
+ if (countFailedFileRemainingSkipRound > 0) {
+ --countFailedFileRemainingSkipRound;
+ return;
+ }
+
+ final long currentFailedFileNumber =
activeLoadTsFileLoader.countAndReportFailedFileNumber();
+
+ if (currentFailedFileNumber < 100) {
+ countFailedFileRemainingSkipRound = 6; // 30 seconds
+ return;
+ }
+ if (currentFailedFileNumber < 1000) {
+ countFailedFileRemainingSkipRound = 18; // 90 seconds
+ return;
+ }
+ if (currentFailedFileNumber < 10000) {
+ countFailedFileRemainingSkipRound = 120; // 600 seconds
+ return;
+ }
+ countFailedFileRemainingSkipRound = currentFailedFileNumber / 50;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
similarity index 81%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 618eff470c6..667266feb5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -17,7 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.active;
+package org.apache.iotdb.db.storageengine.load.active;
+
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
import org.apache.tsfile.utils.Pair;
@@ -36,6 +38,8 @@ public class ActiveLoadPendingQueue {
public synchronized boolean enqueue(final String file, final boolean
isGeneratedByPipe) {
if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
+
+ ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(1);
return true;
}
return false;
@@ -46,12 +50,17 @@ public class ActiveLoadPendingQueue {
if (pair != null) {
pendingFileSet.remove(pair.left);
loadingFileSet.add(pair.left);
+
+ ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(1);
+ ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(-1);
}
return pair;
}
public synchronized void removeFromLoading(final String file) {
loadingFileSet.remove(file);
+
+ ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(-1);
}
public int size() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
new file mode 100644
index 00000000000..c7b62b9bab7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ActiveLoadScheduledExecutorService {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ActiveLoadScheduledExecutorService.class);
+
+ protected static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
+ private static final long MIN_EXECUTION_INTERVAL_SECONDS =
+ IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds();
+ private final ScheduledExecutorService scheduledExecutorService;
+ private Future<?> future;
+ private long rounds;
+
+ private final List<Pair<WrappedRunnable, Long>> jobs = new
CopyOnWriteArrayList<>();
+
+ protected ActiveLoadScheduledExecutorService(final ThreadName threadName) {
+ scheduledExecutorService =
+
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
+ }
+
+ public void register(Runnable runnable) {
+ jobs.add(
+ new Pair<>(
+ new WrappedRunnable() {
+ @Override
+ public void runMayThrow() {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred when executing active load
periodical job.", e);
+ }
+ }
+ },
+ Math.max(MIN_EXECUTION_INTERVAL_SECONDS, 1)));
+ }
+
+ public synchronized void start() {
+ if (future == null) {
+ rounds = 0;
+
+ future =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ scheduledExecutorService,
+ this::execute,
+ MIN_EXECUTION_INTERVAL_SECONDS,
+ MIN_EXECUTION_INTERVAL_SECONDS,
+ TimeUnit.SECONDS);
+ LOGGER.info("Active load periodical jobs executor is started
successfully.");
+ }
+ }
+
+ private void execute() {
+ ++rounds;
+
+ for (final Pair<WrappedRunnable, Long> periodicalJob : jobs) {
+ if (rounds % periodicalJob.right == 0) {
+ periodicalJob.left.run();
+ }
+ }
+ }
+
+ public synchronized void stop() {
+ if (future != null) {
+ future.cancel(false);
+ future = null;
+ LOGGER.info("Active load periodical jobs executor is stopped
successfully.");
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
similarity index 92%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index fb4800deef9..a5b45146067 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.active;
+package org.apache.iotdb.db.storageengine.load.active;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.codec.digest.DigestUtils;
@@ -46,8 +47,12 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.FileVisitResult;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
import java.time.ZoneId;
import java.util.Objects;
import java.util.Optional;
@@ -307,4 +312,25 @@ public class ActiveLoadTsFileLoader {
FileUtils.moveFile(sourceFile, targetFile,
StandardCopyOption.REPLACE_EXISTING);
}
}
+
+ // Metrics
+ public long countAndReportFailedFileNumber() {
+ final long[] fileCount = {0};
+ try {
+ initFailDirIfNecessary();
+ Files.walkFileTree(
+ new File(failDir.get()).toPath(),
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs) {
+ fileCount[0]++;
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
ActiveLoadingFilesMetricsSet.getInstance().recordFailedFileCounter(fileCount[0]);
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to count failed files in fail directory.", e);
+ }
+ return fileCount[0];
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
index c10145b0398..eaaf376a819 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
@@ -17,12 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.limiter;
+package org.apache.iotdb.db.storageengine.load.limiter;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.RateLimiter;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
index 4f2ad607ccb..f0df55a9f63 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.load;
+package org.apache.iotdb.db.storageengine.load.memory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
index 17b52745f23..c7add4b446f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -17,12 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.load;
+package org.apache.iotdb.db.storageengine.load.memory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileMemMetricSet;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileMemMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.slf4j.Logger;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
similarity index 98%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
index 7a22951d672..e0709cece9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.load;
+package org.apache.iotdb.db.storageengine.load.memory;
import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index f034a4627b9..be6e8dcef97 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.load;
+package org.apache.iotdb.db.storageengine.load.memory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
new file mode 100644
index 00000000000..aae166ef5d7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.metrics;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class ActiveLoadingFilesMetricsSet implements IMetricSet {
+
+ private static final ActiveLoadingFilesMetricsSet INSTANCE = new
ActiveLoadingFilesMetricsSet();
+
+ public static final String PENDING = "pending";
+ public static final String QUEUING = "queuing";
+ public static final String LOADING = "loading";
+ public static final String FAILED = "failed";
+
+ private ActiveLoadingFilesMetricsSet() {
+ // empty construct
+ }
+
+ private Counter pendingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ private Counter queuingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ private Counter loadingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ private Counter failedFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+ public void recordPendingFileCounter(final long number) {
+ pendingFileCounter.inc(number - pendingFileCounter.getCount());
+ }
+
+ public void recordQueuingFileCounter(final long number) {
+ queuingFileCounter.inc(number);
+ }
+
+ public void recordLoadingFileCounter(final long number) {
+ loadingFileCounter.inc(number);
+ }
+
+ public void recordFailedFileCounter(final long number) {
+ failedFileCounter.inc(number - failedFileCounter.getCount());
+ }
+
+ @Override
+ public void bindTo(final AbstractMetricService metricService) {
+ pendingFileCounter =
+ metricService.getOrCreateCounter(
+ Metric.ACTIVE_LOADING_FILES.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ PENDING);
+ queuingFileCounter =
+ metricService.getOrCreateCounter(
+ Metric.ACTIVE_LOADING_FILES.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ QUEUING);
+ loadingFileCounter =
+ metricService.getOrCreateCounter(
+ Metric.ACTIVE_LOADING_FILES.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ LOADING);
+ failedFileCounter =
+ metricService.getOrCreateCounter(
+ Metric.ACTIVE_LOADING_FILES.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ FAILED);
+ }
+
+ @Override
+ public void unbindFrom(final AbstractMetricService metricService) {
+ pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+ metricService.remove(
+ MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), PENDING);
+ metricService.remove(
+ MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), QUEUING);
+ metricService.remove(
+ MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), LOADING);
+ metricService.remove(
+ MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), FAILED);
+ }
+
+ public static ActiveLoadingFilesMetricsSet getInstance() {
+ return ActiveLoadingFilesMetricsSet.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
similarity index 98%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index d9ce0233b55..fd0f398644d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.metric.load;
+package org.apache.iotdb.db.storageengine.load.metrics;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java
index c1953815854..77758142b2c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java
@@ -17,11 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.metric.load;
+package org.apache.iotdb.db.storageengine.load.metrics;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index f202cf9e00b..c49a1d38cc3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/BatchedAlignedValueChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/BatchedAlignedValueChunkData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
index 3f48b5cf339..2cac3414696 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/BatchedAlignedValueChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
index 3ab26c0b49a..3b16a9d660c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java
index 940cd2e75b2..186426650fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
index e4abec81e7e..2cd6860b7bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java
similarity index 95%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java
index eece426fdbe..eee4eac2b3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.commons.exception.IllegalPathException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
index 61fdeb7fbf7..a7a9084e198 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java
index 3e952e87b8b..a63227ad3f4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java
@@ -24,9 +24,6 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
-import
org.apache.iotdb.db.queryengine.execution.load.splitter.AlignedChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileSplitter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
@@ -35,6 +32,9 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionC
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.splitter.AlignedChunkData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.exception.write.WriteProcessException;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3765165b2c6..e43ae6e667e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -172,6 +172,7 @@ public enum ThreadName {
// -------------------------- Other --------------------------
ACTIVE_LOAD_TSFILE_LOADER("Active-Load-TsFile-Loader"),
ACTIVE_LOAD_DIR_SCANNER("Active-Load-Dir-Scanner"),
+ ACTIVE_LOAD_METRICS_COLLECTOR("Active-Load-Metrics-Collector"),
SETTLE("Settle"),
INFLUXDB_RPC_SERVICE("InfluxdbRPC-Service"),
INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
@@ -357,6 +358,7 @@ public enum ThreadName {
Arrays.asList(
ACTIVE_LOAD_TSFILE_LOADER,
ACTIVE_LOAD_DIR_SCANNER,
+ ACTIVE_LOAD_METRICS_COLLECTOR,
SETTLE,
INFLUXDB_RPC_SERVICE,
INFLUXDB_RPC_PROCESSOR,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index cc096016f4b..707f46fa259 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -172,6 +172,7 @@ public enum Metric {
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
SUBSCRIPTION_EVENT_TRANSFER("subscription_event_transfer"),
// load related
+ ACTIVE_LOADING_FILES("active_loading_files"),
LOAD_MEM("load_mem"),
LOAD_DISK_IO("load_disk_io"),
LOAD_TIME_COST("load_time_cost"),