This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new d38ea579261 Load: Add metrics for active load  (#13281)
d38ea579261 is described below

commit d38ea57926159f715d0fd1a9fb5b25ff0cdba5e8
Author: YC27 <[email protected]>
AuthorDate: Mon Aug 26 19:23:38 2024 +0800

    Load: Add metrics for active load  (#13281)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    
    (cherry picked from commit efd3420674ec77cf9734b10c3af41d411770f326)
---
 .../pipeconsensus/PipeConsensusReceiver.java       |   2 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   4 +-
 .../plan/analyze/LoadTsFileAnalyzer.java           |   4 +-
 .../plan/node/load/LoadTsFilePieceNode.java        |   2 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  12 +--
 .../db/service/metrics/DataNodeMetricsHelper.java  |   6 +-
 .../iotdb/db/storageengine/StorageEngine.java      |   4 +-
 .../db/storageengine/dataregion/DataRegion.java    |   2 +-
 .../load/LoadTsFileManager.java                    |  17 ++--
 .../storageengine/load/active/ActiveLoadAgent.java |  51 ++++++++++
 .../load/active/ActiveLoadDirScanner.java          |  77 +++++++-------
 .../load/active/ActiveLoadMetricsCollector.java    |  96 +++++++++++++++++
 .../load/active/ActiveLoadPendingQueue.java        |  11 +-
 .../active/ActiveLoadScheduledExecutorService.java | 107 +++++++++++++++++++
 .../load/active/ActiveLoadTsFileLoader.java        |  28 ++++-
 .../load/limiter/LoadTsFileRateLimiter.java        |   4 +-
 .../memory}/LoadTsFileAbstractMemoryBlock.java     |   2 +-
 .../LoadTsFileAnalyzeSchemaMemoryBlock.java        |   4 +-
 .../memory}/LoadTsFileDataCacheMemoryBlock.java    |   2 +-
 .../load/memory}/LoadTsFileMemoryManager.java      |   2 +-
 .../load/metrics/ActiveLoadingFilesMetricsSet.java | 113 +++++++++++++++++++++
 .../load/metrics}/LoadTsFileCostMetricsSet.java    |   2 +-
 .../load/metrics}/LoadTsFileMemMetricSet.java      |   4 +-
 .../load/splitter/AlignedChunkData.java            |   2 +-
 .../splitter/BatchedAlignedValueChunkData.java     |   2 +-
 .../load/splitter/ChunkData.java                   |   2 +-
 .../load/splitter/DeletionData.java                |   2 +-
 .../load/splitter/NonAlignedChunkData.java         |   2 +-
 .../load/splitter/TsFileData.java                  |   2 +-
 .../load/splitter/TsFileSplitter.java              |   2 +-
 .../BatchedCompactionWithTsFileSplitterTest.java   |   6 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../iotdb/commons/service/metric/enums/Metric.java |   1 +
 33 files changed, 488 insertions(+), 91 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index e514a913aaa..6b17faf6524 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -49,13 +49,13 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
 import org.apache.iotdb.db.pipe.consensus.PipeConsensusReceiverMetrics;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionPointCounter;
-import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.rpc.RpcUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 86f17c65331..605fb4c95dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -60,7 +60,6 @@ import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
 import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -146,6 +145,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
 import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
 import org.apache.iotdb.db.utils.constant.SqlConstant;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -187,7 +187,6 @@ import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
-import static 
org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet.ANALYSIS;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
@@ -202,6 +201,7 @@ import static 
org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils.const
 import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
 import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
 import static 
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.GetSourcePathsVisitor.getSourcePaths;
+import static 
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME_HEADER;
 
 /** This visitor is used to analyze each type of Statement and returns the 
{@link Analysis}. */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 781d5a3fe5c..021c320990f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -48,8 +48,6 @@ import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileAnalyzeSchemaMemoryBlock;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
@@ -61,6 +59,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseState
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import 
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
+import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
 import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
 import org.apache.iotdb.db.utils.constant.SqlConstant;
 import org.apache.iotdb.rpc.RpcUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index b8dd843e27c..5d4f02378fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -21,12 +21,12 @@ package 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
 
 import org.apache.tsfile.exception.NotImplementedException;
 import org.apache.tsfile.exception.write.PageException;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 68205c308fb..7faecd2708d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -45,12 +45,6 @@ import 
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileSplitter;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -63,6 +57,12 @@ import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileDataCacheMemoryBlock;
+import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.rpc.TSStatusCode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index e5e3f27f5c1..d9d86001b9a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -39,8 +39,9 @@ import 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileMemMetricSet;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileMemMetricSet;
 import org.apache.iotdb.db.subscription.metric.SubscriptionMetrics;
 import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
 import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
@@ -97,6 +98,7 @@ public class DataNodeMetricsHelper {
 
     // bind load related metrics
     
MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance());
+    
MetricService.getInstance().addMetricSet(ActiveLoadingFilesMetricsSet.getInstance());
   }
 
   private static void initSystemMetrics() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 3d2440c2a77..1e4f2953fe9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -52,8 +52,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
-import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
-import 
org.apache.iotdb.db.queryengine.execution.load.limiter.LoadTsFileRateLimiter;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
@@ -74,6 +72,8 @@ import 
org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
+import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
+import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.rpc.RpcUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2165dc429d6..e30af047e2d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -48,7 +48,6 @@ import 
org.apache.iotdb.db.exception.quota.ExceedQuotaException;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.common.DeviceContext;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
-import 
org.apache.iotdb.db.queryengine.execution.load.limiter.LoadTsFileRateLimiter;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
@@ -104,6 +103,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsF
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
+import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index db9db41cee3..fd21140453c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.db.storageengine.load;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -35,11 +35,6 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import 
org.apache.iotdb.db.queryengine.execution.load.active.ActiveLoadDirScanner;
-import 
org.apache.iotdb.db.queryengine.execution.load.active.ActiveLoadTsFileLoader;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.ChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.DeletionData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -47,6 +42,10 @@ import 
org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
+import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
+import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -97,14 +96,12 @@ public class LoadTsFileManager {
   private final Map<String, CleanupTask> uuid2CleanupTask = new 
ConcurrentHashMap<>();
   private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new 
PriorityBlockingQueue<>();
 
-  private final ActiveLoadTsFileLoader activeLoadTsFileLoader = new 
ActiveLoadTsFileLoader();
-  private final ActiveLoadDirScanner activeLoadDirScanner =
-      new ActiveLoadDirScanner(activeLoadTsFileLoader);
+  private final ActiveLoadAgent activeLoadAgent = new ActiveLoadAgent();
 
   public LoadTsFileManager() {
     registerCleanupTaskExecutor();
     recover();
-    activeLoadDirScanner.start();
+    activeLoadAgent.start();
   }
 
   private void registerCleanupTaskExecutor() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
new file mode 100644
index 00000000000..f060bd9a96f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+public class ActiveLoadAgent {
+
+  private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
+  private final ActiveLoadDirScanner activeLoadDirScanner;
+  private final ActiveLoadMetricsCollector activeLoadMetricsCollector;
+
+  public ActiveLoadAgent() {
+    this.activeLoadTsFileLoader = new ActiveLoadTsFileLoader();
+    this.activeLoadDirScanner = new 
ActiveLoadDirScanner(activeLoadTsFileLoader);
+    this.activeLoadMetricsCollector =
+        new ActiveLoadMetricsCollector(activeLoadTsFileLoader, 
activeLoadDirScanner);
+  }
+
+  public ActiveLoadTsFileLoader loader() {
+    return activeLoadTsFileLoader;
+  }
+
+  public ActiveLoadDirScanner scanner() {
+    return activeLoadDirScanner;
+  }
+
+  public ActiveLoadMetricsCollector metrics() {
+    return activeLoadMetricsCollector;
+  }
+
+  public synchronized void start() {
+    activeLoadDirScanner.start();
+    activeLoadMetricsCollector.start();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
similarity index 74%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index e49789a8c0b..8c2a2787529 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -17,13 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.active;
+package org.apache.iotdb.db.storageengine.load.active;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.tsfile.common.conf.TSFileConfig;
@@ -33,60 +30,34 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class ActiveLoadDirScanner {
+public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadDirScanner.class);
 
   private static final String RESOURCE = ".resource";
   private static final String MODS = ".mods";
 
-  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
-
-  private static final ScheduledExecutorService DIR_SCAN_JOB_EXECUTOR =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.ACTIVE_LOAD_DIR_SCANNER.getName());
-  private static final long MIN_SCAN_INTERVAL_SECONDS =
-      
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningCheckIntervalSeconds();
-
   private final AtomicReference<String[]> listeningDirsConfig = new 
AtomicReference<>();
-  private final Set<String> listeningDirs = new HashSet<>();
+  private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();
 
   private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
 
-  private Future<?> dirScanJobFuture;
-
   public ActiveLoadDirScanner(final ActiveLoadTsFileLoader 
activeLoadTsFileLoader) {
+    super(ThreadName.ACTIVE_LOAD_DIR_SCANNER);
     this.activeLoadTsFileLoader = activeLoadTsFileLoader;
-  }
-
-  public synchronized void start() {
-    if (dirScanJobFuture == null) {
-      dirScanJobFuture =
-          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-              DIR_SCAN_JOB_EXECUTOR,
-              this::scanSafely,
-              MIN_SCAN_INTERVAL_SECONDS,
-              MIN_SCAN_INTERVAL_SECONDS,
-              TimeUnit.SECONDS);
-      LOGGER.info(
-          "Active load dir scanner started. Scan interval: {}s.", 
MIN_SCAN_INTERVAL_SECONDS);
-    }
-  }
 
-  public synchronized void stop() {
-    if (dirScanJobFuture != null) {
-      dirScanJobFuture.cancel(false);
-      dirScanJobFuture = null;
-      LOGGER.info("Active load dir scanner stopped.");
-    }
+    register(this::scanSafely);
+    LOGGER.info("Active load dir scanner periodical job registered");
   }
 
   private void scanSafely() {
@@ -172,4 +143,26 @@ public class ActiveLoadDirScanner {
         : filePathWithResourceOrModsTail.substring(
             0, filePathWithResourceOrModsTail.length() - MODS.length());
   }
+
+  // Metrics
+  public long countAndReportActiveListeningDirsFileNumber() {
+    final long[] fileCount = {0};
+    try {
+      for (String dir : listeningDirs) {
+        Files.walkFileTree(
+            new File(dir).toPath(),
+            new SimpleFileVisitor<Path>() {
+              @Override
+              public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) {
+                fileCount[0]++;
+                return FileVisitResult.CONTINUE;
+              }
+            });
+      }
+      
ActiveLoadingFilesMetricsSet.getInstance().recordPendingFileCounter(fileCount[0]);
+    } catch (final IOException e) {
+      LOGGER.warn("Failed to count active listening dirs file number.", e);
+    }
+    return fileCount[0];
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadMetricsCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadMetricsCollector.java
new file mode 100644
index 00000000000..de40378a105
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadMetricsCollector.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.commons.concurrent.ThreadName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ActiveLoadMetricsCollector extends 
ActiveLoadScheduledExecutorService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadMetricsCollector.class);
+
+  private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
+  private final ActiveLoadDirScanner activeLoadDirScanner;
+
+  private long countPendingFileRemainingSkipRound = 0;
+  private long countFailedFileRemainingSkipRound = 0;
+
+  public ActiveLoadMetricsCollector(
+      final ActiveLoadTsFileLoader activeLoadTsFileLoader,
+      final ActiveLoadDirScanner activeLoadDirScanner) {
+    super(ThreadName.ACTIVE_LOAD_METRICS_COLLECTOR);
+
+    this.activeLoadTsFileLoader = activeLoadTsFileLoader;
+    this.activeLoadDirScanner = activeLoadDirScanner;
+
+    register(this::countAndReportPendingFile);
+    register(this::countAndReportFailedFile);
+    LOGGER.info("Active load metric collector periodical jobs registered");
+  }
+
+  private void countAndReportPendingFile() {
+    if (countPendingFileRemainingSkipRound > 0) {
+      --countPendingFileRemainingSkipRound;
+      return;
+    }
+
+    final long currentPendingFileNumber =
+        activeLoadDirScanner.countAndReportActiveListeningDirsFileNumber();
+
+    if (currentPendingFileNumber < 100) {
+      countPendingFileRemainingSkipRound = 6; // 30 seconds
+      return;
+    }
+    if (currentPendingFileNumber < 1000) {
+      countPendingFileRemainingSkipRound = 18; // 90 seconds
+      return;
+    }
+    if (currentPendingFileNumber < 10000) {
+      countPendingFileRemainingSkipRound = 120; // 600 seconds
+      return;
+    }
+    countPendingFileRemainingSkipRound = 180; // 900 seconds
+  }
+
+  private void countAndReportFailedFile() {
+    if (countFailedFileRemainingSkipRound > 0) {
+      --countFailedFileRemainingSkipRound;
+      return;
+    }
+
+    final long currentFailedFileNumber = 
activeLoadTsFileLoader.countAndReportFailedFileNumber();
+
+    if (currentFailedFileNumber < 100) {
+      countFailedFileRemainingSkipRound = 6; // 30 seconds
+      return;
+    }
+    if (currentFailedFileNumber < 1000) {
+      countFailedFileRemainingSkipRound = 18; // 90 seconds
+      return;
+    }
+    if (currentFailedFileNumber < 10000) {
+      countFailedFileRemainingSkipRound = 120; // 600 seconds
+      return;
+    }
+    countFailedFileRemainingSkipRound = currentFailedFileNumber / 50;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
similarity index 81%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 618eff470c6..667266feb5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -17,7 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.active;
+package org.apache.iotdb.db.storageengine.load.active;
+
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
 
 import org.apache.tsfile.utils.Pair;
 
@@ -36,6 +38,8 @@ public class ActiveLoadPendingQueue {
   public synchronized boolean enqueue(final String file, final boolean 
isGeneratedByPipe) {
     if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
       pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
+
+      ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(1);
       return true;
     }
     return false;
@@ -46,12 +50,17 @@ public class ActiveLoadPendingQueue {
     if (pair != null) {
       pendingFileSet.remove(pair.left);
       loadingFileSet.add(pair.left);
+
+      ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(1);
+      ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(-1);
     }
     return pair;
   }
 
   public synchronized void removeFromLoading(final String file) {
     loadingFileSet.remove(file);
+
+    ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(-1);
   }
 
   public int size() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
new file mode 100644
index 00000000000..c7b62b9bab7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ActiveLoadScheduledExecutorService {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ActiveLoadScheduledExecutorService.class);
+
+  protected static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private static final long MIN_EXECUTION_INTERVAL_SECONDS =
+      IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds();
+  private final ScheduledExecutorService scheduledExecutorService;
+  private Future<?> future;
+  private long rounds;
+
+  private final List<Pair<WrappedRunnable, Long>> jobs = new 
CopyOnWriteArrayList<>();
+
+  protected ActiveLoadScheduledExecutorService(final ThreadName threadName) {
+    scheduledExecutorService =
+        
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
+  }
+
+  public void register(Runnable runnable) {
+    jobs.add(
+        new Pair<>(
+            new WrappedRunnable() {
+              @Override
+              public void runMayThrow() {
+                try {
+                  runnable.run();
+                } catch (Exception e) {
+                  LOGGER.warn("Error occurred when executing active load 
periodical job.", e);
+                }
+              }
+            },
+            Math.max(MIN_EXECUTION_INTERVAL_SECONDS, 1)));
+  }
+
+  public synchronized void start() {
+    if (future == null) {
+      rounds = 0;
+
+      future =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              scheduledExecutorService,
+              this::execute,
+              MIN_EXECUTION_INTERVAL_SECONDS,
+              MIN_EXECUTION_INTERVAL_SECONDS,
+              TimeUnit.SECONDS);
+      LOGGER.info("Active load periodical jobs executor is started 
successfully.");
+    }
+  }
+
+  private void execute() {
+    ++rounds;
+
+    for (final Pair<WrappedRunnable, Long> periodicalJob : jobs) {
+      if (rounds % periodicalJob.right == 0) {
+        periodicalJob.left.run();
+      }
+    }
+  }
+
+  public synchronized void stop() {
+    if (future != null) {
+      future.cancel(false);
+      future = null;
+      LOGGER.info("Active load periodical jobs executor is stopped 
successfully.");
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
similarity index 92%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index fb4800deef9..a5b45146067 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.active;
+package org.apache.iotdb.db.storageengine.load.active;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.codec.digest.DigestUtils;
@@ -46,8 +47,12 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.time.ZoneId;
 import java.util.Objects;
 import java.util.Optional;
@@ -307,4 +312,25 @@ public class ActiveLoadTsFileLoader {
       FileUtils.moveFile(sourceFile, targetFile, 
StandardCopyOption.REPLACE_EXISTING);
     }
   }
+
+  // Metrics
+  public long countAndReportFailedFileNumber() {
+    final long[] fileCount = {0};
+    try {
+      initFailDirIfNecessary();
+      Files.walkFileTree(
+          new File(failDir.get()).toPath(),
+          new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) {
+              fileCount[0]++;
+              return FileVisitResult.CONTINUE;
+            }
+          });
+      
ActiveLoadingFilesMetricsSet.getInstance().recordFailedFileCounter(fileCount[0]);
+    } catch (final IOException e) {
+      LOGGER.warn("Failed to count failed files in fail directory.", e);
+    }
+    return fileCount[0];
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
index c10145b0398..eaaf376a819 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/limiter/LoadTsFileRateLimiter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.limiter;
+package org.apache.iotdb.db.storageengine.load.limiter;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
 
 import com.google.common.util.concurrent.AtomicDouble;
 import com.google.common.util.concurrent.RateLimiter;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
index 4f2ad607ccb..f0df55a9f63 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.load;
+package org.apache.iotdb.db.storageengine.load.memory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
index 17b52745f23..c7add4b446f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.load;
+package org.apache.iotdb.db.storageengine.load.memory;
 
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileMemMetricSet;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileMemMetricSet;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.slf4j.Logger;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
index 7a22951d672..e0709cece9e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.load;
+package org.apache.iotdb.db.storageengine.load.memory;
 
 import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index f034a4627b9..be6e8dcef97 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.load;
+package org.apache.iotdb.db.storageengine.load.memory;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
new file mode 100644
index 00000000000..aae166ef5d7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.metrics;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class ActiveLoadingFilesMetricsSet implements IMetricSet {
+
+  private static final ActiveLoadingFilesMetricsSet INSTANCE = new 
ActiveLoadingFilesMetricsSet();
+
+  public static final String PENDING = "pending";
+  public static final String QUEUING = "queuing";
+  public static final String LOADING = "loading";
+  public static final String FAILED = "failed";
+
+  private ActiveLoadingFilesMetricsSet() {
+    // empty construct
+  }
+
+  private Counter pendingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  private Counter queuingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  private Counter loadingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  private Counter failedFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+  public void recordPendingFileCounter(final long number) {
+    pendingFileCounter.inc(number - pendingFileCounter.getCount());
+  }
+
+  public void recordQueuingFileCounter(final long number) {
+    queuingFileCounter.inc(number);
+  }
+
+  public void recordLoadingFileCounter(final long number) {
+    loadingFileCounter.inc(number);
+  }
+
+  public void recordFailedFileCounter(final long number) {
+    failedFileCounter.inc(number - failedFileCounter.getCount());
+  }
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    pendingFileCounter =
+        metricService.getOrCreateCounter(
+            Metric.ACTIVE_LOADING_FILES.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.TYPE.toString(),
+            PENDING);
+    queuingFileCounter =
+        metricService.getOrCreateCounter(
+            Metric.ACTIVE_LOADING_FILES.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.TYPE.toString(),
+            QUEUING);
+    loadingFileCounter =
+        metricService.getOrCreateCounter(
+            Metric.ACTIVE_LOADING_FILES.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.TYPE.toString(),
+            LOADING);
+    failedFileCounter =
+        metricService.getOrCreateCounter(
+            Metric.ACTIVE_LOADING_FILES.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.TYPE.toString(),
+            FAILED);
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+    queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+    loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+    failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+    metricService.remove(
+        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), PENDING);
+    metricService.remove(
+        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), QUEUING);
+    metricService.remove(
+        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), LOADING);
+    metricService.remove(
+        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), FAILED);
+  }
+
+  public static ActiveLoadingFilesMetricsSet getInstance() {
+    return ActiveLoadingFilesMetricsSet.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index d9ce0233b55..fd0f398644d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.metric.load;
+package org.apache.iotdb.db.storageengine.load.metrics;
 
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java
index c1953815854..77758142b2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileMemMetricSet.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.metric.load;
+package org.apache.iotdb.db.storageengine.load.metrics;
 
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.utils.MetricLevel;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index f202cf9e00b..c49a1d38cc3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/BatchedAlignedValueChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/BatchedAlignedValueChunkData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
index 3f48b5cf339..2cac3414696 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/BatchedAlignedValueChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
index 3ab26c0b49a..3b16a9d660c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/ChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java
index 940cd2e75b2..186426650fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/DeletionData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
index e4abec81e7e..2cd6860b7bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/NonAlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java
index eece426fdbe..eee4eac2b3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
index 61fdeb7fbf7..a7a9084e198 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/splitter/TsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.load.splitter;
+package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java
index 3e952e87b8b..a63227ad3f4 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java
@@ -24,9 +24,6 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import 
org.apache.iotdb.db.queryengine.execution.load.splitter.AlignedChunkData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileData;
-import org.apache.iotdb.db.queryengine.execution.load.splitter.TsFileSplitter;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
@@ -35,6 +32,9 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionC
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.splitter.AlignedChunkData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
+import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter;
 
 import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.exception.write.WriteProcessException;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3765165b2c6..e43ae6e667e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -172,6 +172,7 @@ public enum ThreadName {
   // -------------------------- Other --------------------------
   ACTIVE_LOAD_TSFILE_LOADER("Active-Load-TsFile-Loader"),
   ACTIVE_LOAD_DIR_SCANNER("Active-Load-Dir-Scanner"),
+  ACTIVE_LOAD_METRICS_COLLECTOR("Active-Load-Metrics-Collector"),
   SETTLE("Settle"),
   INFLUXDB_RPC_SERVICE("InfluxdbRPC-Service"),
   INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
@@ -357,6 +358,7 @@ public enum ThreadName {
           Arrays.asList(
               ACTIVE_LOAD_TSFILE_LOADER,
               ACTIVE_LOAD_DIR_SCANNER,
+              ACTIVE_LOAD_METRICS_COLLECTOR,
               SETTLE,
               INFLUXDB_RPC_SERVICE,
               INFLUXDB_RPC_PROCESSOR,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index cc096016f4b..707f46fa259 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -172,6 +172,7 @@ public enum Metric {
   SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
   SUBSCRIPTION_EVENT_TRANSFER("subscription_event_transfer"),
   // load related
+  ACTIVE_LOADING_FILES("active_loading_files"),
   LOAD_MEM("load_mem"),
   LOAD_DISK_IO("load_disk_io"),
   LOAD_TIME_COST("load_time_cost"),

Reply via email to