This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 1edaaf318c3 [To rel/1.2] [metrics] Refactor compaction read write 
throughput metrics  (#10301)
1edaaf318c3 is described below

commit 1edaaf318c36566c345e355957dc17d6d3e994ef
Author: Liu Xuxin <[email protected]>
AuthorDate: Sun Jun 25 21:57:56 2023 +0800

    [To rel/1.2] [metrics] Refactor compaction read write throughput metrics  
(#10301)
---
 .../commons/concurrent/DataNodeThreadModule.java   |   1 +
 .../iotdb/commons/concurrent/ThreadName.java       |  11 +-
 .../impl/ReadChunkCompactionPerformer.java         |  21 ++-
 .../execute/utils/MultiTsFileDeviceIterator.java   |  19 +-
 .../fast/AlignedSeriesCompactionExecutor.java      |  21 +++
 .../readchunk/AlignedSeriesCompactionExecutor.java |  99 ++++++-----
 .../readchunk/SingleSeriesCompactionExecutor.java  |  56 ++----
 .../utils/writer/AbstractCompactionWriter.java     |  73 +++-----
 .../writer/AbstractCrossCompactionWriter.java      |  24 +--
 .../writer/AbstractInnerCompactionWriter.java      |  13 +-
 .../utils/writer/FastCrossCompactionWriter.java    |   4 +-
 .../writer/ReadPointCrossCompactionWriter.java     |   2 +-
 .../writer/ReadPointInnerCompactionWriter.java     |   2 +-
 .../compaction/io/CompactionTsFileReader.java      | 178 +++++++++++++++++++
 .../compaction/io/CompactionTsFileWriter.java      | 130 ++++++++++++++
 .../compaction/schedule/CompactionTaskManager.java |  10 --
 .../schedule/constant/CompactionIoDataType.java    |  33 ++--
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   5 +
 .../db/service/metrics/CompactionMetrics.java      | 192 ++++++++++++++-------
 .../iotdb/db/service/metrics/FileMetrics.java      |   1 +
 .../iotdb/tsfile/read/TsFileDeviceIterator.java    |   3 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  12 ++
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  28 ++-
 23 files changed, 667 insertions(+), 271 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
index f0d61466497..3023983f05a 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
@@ -34,6 +34,7 @@ public enum DataNodeThreadModule {
   JVM,
   LOG_BACK,
   METRICS,
+  SYSTEM,
   OTHER,
   UNKNOWN
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e28887791b4..e9a136a7b08 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -137,6 +137,9 @@ public enum ThreadName {
   PROMETHEUS_REACTOR_HTTP_NIO("reactor-http-nio"),
   PROMETHEUS_REACTOR_HTTP_EPOLL("reactor-http-epoll"),
   PROMETHEUS_BOUNDED_ELASTIC("boundedElastic-evictor"),
+  // -------------------------- System --------------------------
+  FORK_JOIN_POOL("ForkJoinPool"),
+  TIMER("timer"),
   // -------------------------- Other --------------------------
   TTL_CHECK("TTL-CHECK"),
   SETTLE("Settle"),
@@ -149,8 +152,8 @@ public enum ThreadName {
   // the unknown thread name is used for metrics
   UNKOWN("UNKNOWN");
 
-  private final String name;
   private static final Logger log = LoggerFactory.getLogger(ThreadName.class);
+  private final String name;
   private static Set<ThreadName> queryThreadNames =
       new HashSet<>(
           Arrays.asList(
@@ -264,6 +267,9 @@ public enum ThreadName {
               PROMETHEUS_REACTOR_HTTP_NIO,
               PROMETHEUS_REACTOR_HTTP_EPOLL,
               PROMETHEUS_BOUNDED_ELASTIC));
+
+  private static Set<ThreadName> systemThreadNames =
+      new HashSet<>(Arrays.asList(FORK_JOIN_POOL, TIMER));
   private static Set<ThreadName> otherThreadNames =
       new HashSet<>(
           Arrays.asList(
@@ -297,6 +303,7 @@ public enum ThreadName {
           computeThreadNames,
           jvmThreadNames,
           metricsThreadNames,
+          systemThreadNames,
           otherThreadNames
         };
     DataNodeThreadModule[] modules =
@@ -313,6 +320,7 @@ public enum ThreadName {
           DataNodeThreadModule.COMPUTE,
           DataNodeThreadModule.JVM,
           DataNodeThreadModule.METRICS,
+          DataNodeThreadModule.SYSTEM,
           DataNodeThreadModule.OTHER
         };
 
@@ -339,6 +347,7 @@ public enum ThreadName {
         return module;
       }
     }
+    log.debug("The module for this thread is unknown: {}", givenThreadName);
     return null;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index da0c9c91dd8..27d4d48afc7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.engine.compaction.execute.performer.impl;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -27,6 +26,8 @@ import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.MultiTsFileDeviceIterator;
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.AlignedSeriesCompactionExecutor;
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.rescon.SystemInfo;
@@ -34,18 +35,12 @@ import 
org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
 public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private TsFileResource targetResource;
   private List<TsFileResource> seqFiles;
   private CompactionTaskSummary summary;
@@ -71,8 +66,12 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
                 / 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
                 * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
     try (MultiTsFileDeviceIterator deviceIterator = new 
MultiTsFileDeviceIterator(seqFiles);
-        TsFileIOWriter writer =
-            new TsFileIOWriter(targetResource.getTsFile(), true, 
sizeForFileWriter)) {
+        CompactionTsFileWriter writer =
+            new CompactionTsFileWriter(
+                targetResource.getTsFile(),
+                true,
+                sizeForFileWriter,
+                CompactionType.INNER_SEQ_COMPACTION)) {
       while (deviceIterator.hasNextDevice()) {
         Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
         String device = deviceInfo.left;
@@ -113,7 +112,7 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
   private void compactAlignedSeries(
       String device,
       TsFileResource targetResource,
-      TsFileIOWriter writer,
+      CompactionTsFileWriter writer,
       MultiTsFileDeviceIterator deviceIterator)
       throws IOException, InterruptedException {
     checkThreadInterrupted();
@@ -153,7 +152,7 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
   private void compactNotAlignedSeries(
       String device,
       TsFileResource targetResource,
-      TsFileIOWriter writer,
+      CompactionTsFileWriter writer,
       MultiTsFileDeviceIterator deviceIterator)
       throws IOException, MetadataException, InterruptedException {
     writer.startChunkGroup(device);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 7e2484e0d71..28329e0d5cb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -70,7 +72,9 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
         this.tsFileResourcesSortedByDesc, 
TsFileResource::compareFileCreationOrderByDesc);
     try {
       for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) {
-        TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFileResource.getTsFilePath());
+        CompactionTsFileReader reader =
+            new CompactionTsFileReader(
+                tsFileResource.getTsFilePath(), 
CompactionType.INNER_SEQ_COMPACTION);
         readerMap.put(tsFileResource, reader);
         deviceIteratorMap.put(tsFileResource, 
reader.getAllDevicesIteratorWithIsAligned());
       }
@@ -112,8 +116,19 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
     Collections.sort(
         this.tsFileResourcesSortedByDesc, 
TsFileResource::compareFileCreationOrderByDesc);
     this.readerMap = readerMap;
+
+    CompactionType type = null;
+    if (!seqResources.isEmpty() && !unseqResources.isEmpty()) {
+      type = CompactionType.CROSS_COMPACTION;
+    } else if (seqResources.isEmpty()) {
+      type = CompactionType.INNER_UNSEQ_COMPACTION;
+    } else {
+      type = CompactionType.INNER_SEQ_COMPACTION;
+    }
+
     for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
-      TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFileResource.getTsFilePath());
+      TsFileSequenceReader reader =
+          new CompactionTsFileReader(tsFileResource.getTsFilePath(), type);
       readerMap.put(tsFileResource, reader);
       deviceIteratorMap.put(tsFileResource, 
reader.getAllDevicesIteratorWithIsAligned());
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index cd7adbb1adc..4d68016a978 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.FileElement;
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.PageElement;
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.writer.AbstractCompactionWriter;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -88,6 +89,8 @@ public class AlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor {
   @Override
   protected void compactFiles()
       throws PageException, IOException, WriteProcessException, 
IllegalPathException {
+    markStartOfAlignedSeries();
+
     while (!fileList.isEmpty()) {
       List<FileElement> overlappedFiles = findOverlapFiles(fileList.get(0));
 
@@ -96,6 +99,24 @@ public class AlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor {
 
       compactChunks();
     }
+
+    markEndOfAlignedSeries();
+  }
+
+  private void markStartOfAlignedSeries() {
+    for (TsFileSequenceReader reader : readerCacheMap.values()) {
+      if (reader instanceof CompactionTsFileReader) {
+        ((CompactionTsFileReader) reader).markStartOfAlignedSeries();
+      }
+    }
+  }
+
+  private void markEndOfAlignedSeries() {
+    for (TsFileSequenceReader reader : readerCacheMap.values()) {
+      if (reader instanceof CompactionTsFileReader) {
+        ((CompactionTsFileReader) reader).markEndOfAlignedSeries();
+      }
+    }
   }
 
   /** Deserialize files into chunk metadatas and put them into the chunk 
metadata queue. */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index 6690d133ac8..08eeaba45a7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -20,11 +20,9 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import 
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -39,9 +37,6 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import com.google.common.util.concurrent.RateLimiter;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -56,14 +51,12 @@ public class AlignedSeriesCompactionExecutor {
   private final LinkedList<Pair<TsFileSequenceReader, 
List<AlignedChunkMetadata>>>
       readerAndChunkMetadataList;
   private final TsFileResource targetResource;
-  private final TsFileIOWriter writer;
+  private final CompactionTsFileWriter writer;
 
   private final AlignedChunkWriterImpl chunkWriter;
   private final List<IMeasurementSchema> schemaList;
   private long remainingPointInChunkWriter = 0L;
   private final CompactionTaskSummary summary;
-  private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   private final long chunkSizeThreshold =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -74,7 +67,7 @@ public class AlignedSeriesCompactionExecutor {
       String device,
       TsFileResource targetResource,
       LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> 
readerAndChunkMetadataList,
-      TsFileIOWriter writer,
+      CompactionTsFileWriter writer,
       CompactionTaskSummary summary)
       throws IOException {
     this.device = device;
@@ -101,27 +94,13 @@ public class AlignedSeriesCompactionExecutor {
     for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair 
:
         readerAndChunkMetadataList) {
       TsFileSequenceReader reader = readerListPair.left;
+      if (reader instanceof CompactionTsFileReader) {
+        ((CompactionTsFileReader) reader).markStartOfAlignedSeries();
+      }
       List<AlignedChunkMetadata> alignedChunkMetadataList = 
readerListPair.right;
-      for (AlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
-        List<IChunkMetadata> valueChunkMetadataList =
-            alignedChunkMetadata.getValueChunkMetadataList();
-        for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
-          if (chunkMetadata == null) {
-            continue;
-          }
-          if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
-            continue;
-          }
-          measurementSet.add(chunkMetadata.getMeasurementUid());
-          Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadata);
-          ChunkHeader header = chunk.getHeader();
-          schemaSet.add(
-              new MeasurementSchema(
-                  header.getMeasurementID(),
-                  header.getDataType(),
-                  header.getEncodingType(),
-                  header.getCompressionType()));
-        }
+      collectSchemaFromOneFile(alignedChunkMetadataList, reader, schemaSet, 
measurementSet);
+      if (reader instanceof CompactionTsFileReader) {
+        ((CompactionTsFileReader) reader).markEndOfAlignedSeries();
       }
     }
     List<IMeasurementSchema> schemaList = new ArrayList<>(schemaSet);
@@ -129,12 +108,46 @@ public class AlignedSeriesCompactionExecutor {
     return schemaList;
   }
 
+  private void collectSchemaFromOneFile(
+      List<AlignedChunkMetadata> alignedChunkMetadataList,
+      TsFileSequenceReader reader,
+      Set<MeasurementSchema> schemaSet,
+      Set<String> measurementSet)
+      throws IOException {
+    for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) 
{
+      List<IChunkMetadata> valueChunkMetadataList =
+          alignedChunkMetadata.getValueChunkMetadataList();
+      for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
+        if (chunkMetadata == null) {
+          continue;
+        }
+        if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
+          continue;
+        }
+        measurementSet.add(chunkMetadata.getMeasurementUid());
+        Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadata);
+        ChunkHeader header = chunk.getHeader();
+        schemaSet.add(
+            new MeasurementSchema(
+                header.getMeasurementID(),
+                header.getDataType(),
+                header.getEncodingType(),
+                header.getCompressionType()));
+      }
+    }
+  }
+
   public void execute() throws IOException {
     while (readerAndChunkMetadataList.size() > 0) {
       Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair =
           readerAndChunkMetadataList.removeFirst();
       TsFileSequenceReader reader = readerListPair.left;
       List<AlignedChunkMetadata> alignedChunkMetadataList = 
readerListPair.right;
+
+      if (reader instanceof CompactionTsFileReader) {
+        ((CompactionTsFileReader) reader).markStartOfAlignedSeries();
+      }
+
       TsFileAlignedSeriesReaderIterator readerIterator =
           new TsFileAlignedSeriesReaderIterator(reader, 
alignedChunkMetadataList, schemaList);
       while (readerIterator.hasNext()) {
@@ -142,22 +155,16 @@ public class AlignedSeriesCompactionExecutor {
             readerIterator.nextReader();
         
summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum());
         
summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum());
-        
CompactionMetrics.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize());
         compactOneAlignedChunk(
             nextAlignedChunkInfo.getReader(), 
nextAlignedChunkInfo.getNotNullChunkNum());
       }
+      if (reader instanceof CompactionTsFileReader) {
+        ((CompactionTsFileReader) reader).markEndOfAlignedSeries();
+      }
     }
 
     if (remainingPointInChunkWriter != 0L) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetrics.getInstance()
-          .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION,
-              ProcessChunkType.DESERIALIZE_CHUNK,
-              true,
-              chunkWriter.estimateMaxSeriesMemSize());
-      chunkWriter.writeToFileWriter(writer);
+      writer.writeChunk(chunkWriter);
     }
     writer.checkMetadataSizeAndMayFlush();
   }
@@ -191,15 +198,7 @@ public class AlignedSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (remainingPointInChunkWriter >= chunkPointNumThreshold
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * 
schemaList.size()) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetrics.getInstance()
-          .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION,
-              ProcessChunkType.DESERIALIZE_CHUNK,
-              true,
-              chunkWriter.estimateMaxSeriesMemSize());
-      chunkWriter.writeToFileWriter(writer);
+      writer.writeChunk(chunkWriter);
       remainingPointInChunkWriter = 0L;
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index 47d80ccf4ea..0c695eac833 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -22,11 +22,8 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import 
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -39,9 +36,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import com.google.common.util.concurrent.RateLimiter;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -52,15 +46,13 @@ public class SingleSeriesCompactionExecutor {
   private String device;
   private PartialPath series;
   private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList;
-  private TsFileIOWriter fileWriter;
+  private CompactionTsFileWriter fileWriter;
   private TsFileResource targetResource;
 
   private IMeasurementSchema schema;
   private ChunkWriterImpl chunkWriter;
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
-  private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
   // record the min time and max time to update the target resource
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
@@ -80,7 +72,7 @@ public class SingleSeriesCompactionExecutor {
       PartialPath series,
       IMeasurementSchema measurementSchema,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
-      TsFileIOWriter fileWriter,
+      CompactionTsFileWriter fileWriter,
       TsFileResource targetResource) {
     this.device = series.getDevice();
     this.series = series;
@@ -97,7 +89,7 @@ public class SingleSeriesCompactionExecutor {
   public SingleSeriesCompactionExecutor(
       PartialPath series,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
-      TsFileIOWriter fileWriter,
+      CompactionTsFileWriter fileWriter,
       TsFileResource targetResource,
       CompactionTaskSummary summary) {
     this.device = series.getDevice();
@@ -129,10 +121,6 @@ public class SingleSeriesCompactionExecutor {
         if (this.chunkWriter == null) {
           constructChunkWriterFromReadChunk(currentChunk);
         }
-        CompactionMetrics.getInstance()
-            .recordReadInfo(
-                (long) currentChunk.getHeader().getSerializedSize()
-                    + currentChunk.getHeader().getDataSize());
 
         // if this chunk is modified, deserialize it into points
         if (chunkMetadata.getDeleteIntervalList() != null) {
@@ -155,7 +143,7 @@ public class SingleSeriesCompactionExecutor {
 
     // after all the chunk of this sensor is read, flush the remaining data
     if (cachedChunk != null) {
-      flushChunkToFileWriter(cachedChunk, cachedChunkMetadata, true);
+      flushChunkToFileWriter(cachedChunk, cachedChunkMetadata);
       cachedChunk = null;
       cachedChunkMetadata = null;
     } else if (pointCountInChunkWriter != 0L) {
@@ -211,7 +199,7 @@ public class SingleSeriesCompactionExecutor {
       // there is no points remaining in ChunkWriter and no cached chunk
       // flush it to file directly
       summary.increaseDirectlyFlushChunkNum(1);
-      flushChunkToFileWriter(chunk, chunkMetadata, false);
+      flushChunkToFileWriter(chunk, chunkMetadata);
     }
   }
 
@@ -327,36 +315,20 @@ public class SingleSeriesCompactionExecutor {
     }
   }
 
-  private void flushChunkToFileWriter(
-      Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws 
IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(chunk));
+  private void flushChunkToFileWriter(Chunk chunk, ChunkMetadata 
chunkMetadata) throws IOException {
     if (chunkMetadata.getStartTime() < minStartTimestamp) {
       minStartTimestamp = chunkMetadata.getStartTime();
     }
     if (chunkMetadata.getEndTime() > maxEndTimestamp) {
       maxEndTimestamp = chunkMetadata.getEndTime();
     }
-    CompactionMetrics.getInstance()
-        .recordWriteInfo(
-            CompactionType.INNER_SEQ_COMPACTION,
-            isCachedChunk ? ProcessChunkType.MERGE_CHUNK : 
ProcessChunkType.FLUSH_CHUNK,
-            false,
-            getChunkSize(chunk));
     fileWriter.writeChunk(chunk, chunkMetadata);
   }
 
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (pointCountInChunkWriter >= targetChunkPointNum
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetrics.getInstance()
-          .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION,
-              ProcessChunkType.DESERIALIZE_CHUNK,
-              false,
-              chunkWriter.estimateMaxSeriesMemSize());
-      chunkWriter.writeToFileWriter(fileWriter);
+      fileWriter.writeChunk(chunkWriter);
       pointCountInChunkWriter = 0L;
     }
   }
@@ -364,22 +336,14 @@ public class SingleSeriesCompactionExecutor {
   private void flushCachedChunkIfLargeEnough() throws IOException {
     if (cachedChunk.getChunkStatistic().getCount() >= targetChunkPointNum
         || getChunkSize(cachedChunk) >= targetChunkSize) {
-      flushChunkToFileWriter(cachedChunk, cachedChunkMetadata, true);
+      flushChunkToFileWriter(cachedChunk, cachedChunkMetadata);
       cachedChunk = null;
       cachedChunkMetadata = null;
     }
   }
 
   private void flushChunkWriter() throws IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(
-        compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-    CompactionMetrics.getInstance()
-        .recordWriteInfo(
-            CompactionType.INNER_SEQ_COMPACTION,
-            ProcessChunkType.DESERIALIZE_CHUNK,
-            false,
-            chunkWriter.estimateMaxSeriesMemSize());
-    chunkWriter.writeToFileWriter(fileWriter);
+    fileWriter.writeChunk(chunkWriter);
     pointCountInChunkWriter = 0L;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 1f4b13b9a14..7b7fbc2ee1a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -19,10 +19,7 @@
 package org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import 
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
@@ -39,9 +36,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import com.google.common.util.concurrent.RateLimiter;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -50,9 +44,6 @@ import java.util.List;
 public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected int subTaskNum = 
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
 
-  private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
-
   // check if there is unseq error point during writing
   protected long[] lastTime = new long[subTaskNum];
 
@@ -135,43 +126,43 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
    */
   public abstract void checkAndMayFlushChunkMetadata() throws IOException;
 
-  protected void writeDataPoint(long timestamp, TsPrimitiveType value, 
IChunkWriter iChunkWriter) {
-    if (iChunkWriter instanceof ChunkWriterImpl) {
-      ChunkWriterImpl chunkWriter = (ChunkWriterImpl) iChunkWriter;
-      switch (chunkWriter.getDataType()) {
+  protected void writeDataPoint(long timestamp, TsPrimitiveType value, 
IChunkWriter chunkWriter) {
+    if (chunkWriter instanceof ChunkWriterImpl) {
+      ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
+      switch (chunkWriterImpl.getDataType()) {
         case TEXT:
-          chunkWriter.write(timestamp, value.getBinary());
+          chunkWriterImpl.write(timestamp, value.getBinary());
           break;
         case DOUBLE:
-          chunkWriter.write(timestamp, value.getDouble());
+          chunkWriterImpl.write(timestamp, value.getDouble());
           break;
         case BOOLEAN:
-          chunkWriter.write(timestamp, value.getBoolean());
+          chunkWriterImpl.write(timestamp, value.getBoolean());
           break;
         case INT64:
-          chunkWriter.write(timestamp, value.getLong());
+          chunkWriterImpl.write(timestamp, value.getLong());
           break;
         case INT32:
-          chunkWriter.write(timestamp, value.getInt());
+          chunkWriterImpl.write(timestamp, value.getInt());
           break;
         case FLOAT:
-          chunkWriter.write(timestamp, value.getFloat());
+          chunkWriterImpl.write(timestamp, value.getFloat());
           break;
         default:
-          throw new UnsupportedOperationException("Unknown data type " + 
chunkWriter.getDataType());
+          throw new UnsupportedOperationException(
+              "Unknown data type " + chunkWriterImpl.getDataType());
       }
     } else {
-      AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
iChunkWriter;
+      AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
       alignedChunkWriter.write(timestamp, value.getVector());
     }
   }
 
-  protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter 
iChunkWriter, int subTaskId)
+  protected void sealChunk(
+      CompactionTsFileWriter targetWriter, IChunkWriter chunkWriter, int 
subTaskId)
       throws IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(
-        compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
     synchronized (targetWriter) {
-      iChunkWriter.writeToFileWriter(targetWriter);
+      targetWriter.writeChunk(chunkWriter);
     }
     chunkPointNumArray[subTaskId] = 0;
   }
@@ -188,19 +179,18 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
       throws IOException;
 
   protected void flushNonAlignedChunkToFileWriter(
-      TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, 
int subTaskId)
+      CompactionTsFileWriter targetWriter, Chunk chunk, ChunkMetadata 
chunkMetadata, int subTaskId)
       throws IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(chunk));
     synchronized (targetWriter) {
       // seal last chunk to file writer
-      chunkWriters[subTaskId].writeToFileWriter(targetWriter);
+      targetWriter.writeChunk(chunkWriters[subTaskId]);
       chunkPointNumArray[subTaskId] = 0;
       targetWriter.writeChunk(chunk, chunkMetadata);
     }
   }
 
   protected void flushAlignedChunkToFileWriter(
-      TsFileIOWriter targetWriter,
+      CompactionTsFileWriter targetWriter,
       Chunk timeChunk,
       IChunkMetadata timeChunkMetadata,
       List<Chunk> valueChunks,
@@ -210,11 +200,12 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     synchronized (targetWriter) {
       AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriters[subTaskId];
       // seal last chunk to file writer
-      alignedChunkWriter.writeToFileWriter(targetWriter);
+      targetWriter.writeChunk(alignedChunkWriter);
       chunkPointNumArray[subTaskId] = 0;
 
+      targetWriter.markStartingWritingAligned();
+
       // flush time chunk
-      CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(timeChunk));
       targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
 
       // flush value chunks
@@ -231,10 +222,10 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
               Statistics.getStatsByType(valueChunkWriter.getDataType()));
           continue;
         }
-        CompactionTaskManager.mergeRateLimiterAcquire(
-            compactionRateLimiter, getChunkSize(valueChunk));
         targetWriter.writeChunk(valueChunk, (ChunkMetadata) 
valueChunkMetadatas.get(i));
       }
+
+      targetWriter.markEndingWritingAligned();
     }
   }
 
@@ -292,22 +283,14 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   }
 
   protected void checkChunkSizeAndMayOpenANewChunk(
-      TsFileIOWriter fileWriter, IChunkWriter iChunkWriter, int subTaskId, 
boolean isCrossSpace)
+      CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int 
subTaskId)
       throws IOException {
     if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) {
       // if chunk point num reaches the check point, then check if the chunk 
size over threshold
       lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint;
-      if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, 
targetChunkPointNum, false)) {
-        sealChunk(fileWriter, iChunkWriter, subTaskId);
+      if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, 
targetChunkPointNum, false)) {
+        sealChunk(fileWriter, chunkWriter, subTaskId);
         lastCheckIndex = 0;
-        CompactionMetrics.getInstance()
-            .recordWriteInfo(
-                isCrossSpace
-                    ? CompactionType.CROSS_COMPACTION
-                    : CompactionType.INNER_UNSEQ_COMPACTION,
-                ProcessChunkType.DESERIALIZE_CHUNK,
-                isAlign,
-                iChunkWriter.estimateMaxSeriesMemSize());
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index 44409808daf..277379cb9dd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -21,6 +21,8 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -29,7 +31,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -39,7 +40,7 @@ import java.util.Map;
 public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWriter {
 
   // target fileIOWriters
-  protected List<TsFileIOWriter> targetFileWriters = new ArrayList<>();
+  protected List<CompactionTsFileWriter> targetFileWriters = new ArrayList<>();
 
   // source tsfiles
   private List<TsFileResource> seqTsFileResources;
@@ -77,8 +78,11 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
     boolean enableMemoryControl = 
IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
     for (int i = 0; i < targetResources.size(); i++) {
       this.targetFileWriters.add(
-          new TsFileIOWriter(
-              targetResources.get(i).getTsFile(), enableMemoryControl, 
memorySizeForEachWriter));
+          new CompactionTsFileWriter(
+              targetResources.get(i).getTsFile(),
+              enableMemoryControl,
+              memorySizeForEachWriter,
+              CompactionType.CROSS_COMPACTION));
       isEmptyFile[i] = true;
     }
     this.seqTsFileResources = seqFileResources;
@@ -99,7 +103,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
   @Override
   public void endChunkGroup() throws IOException {
     for (int i = 0; i < seqTsFileResources.size(); i++) {
-      TsFileIOWriter targetFileWriter = targetFileWriters.get(i);
+      CompactionTsFileWriter targetFileWriter = targetFileWriters.get(i);
       if (isDeviceExistedInTargetFiles[i]) {
         // update resource
         CompactionUtils.updateResource(targetResources.get(i), 
targetFileWriter, deviceId);
@@ -129,7 +133,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
     writeDataPoint(timestamp, value, chunkWriters[subTaskId]);
     chunkPointNumArray[subTaskId]++;
     checkChunkSizeAndMayOpenANewChunk(
-        targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, 
true);
+        targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
     isDeviceExistedInTargetFiles[fileIndex] = true;
     isEmptyFile[fileIndex] = false;
     lastTime[subTaskId] = timestamp;
@@ -153,7 +157,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
 
   @Override
   public void close() throws IOException {
-    for (TsFileIOWriter targetWriter : targetFileWriters) {
+    for (CompactionTsFileWriter targetWriter : targetFileWriters) {
       if (targetWriter != null && targetWriter.canWrite()) {
         targetWriter.close();
       }
@@ -165,8 +169,8 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
   @Override
   public void checkAndMayFlushChunkMetadata() throws IOException {
     for (int i = 0; i < targetFileWriters.size(); i++) {
-      TsFileIOWriter fileIOWriter = targetFileWriters.get(i);
-      fileIOWriter.checkMetadataSizeAndMayFlush();
+      CompactionTsFileWriter fileIoWriter = targetFileWriters.get(i);
+      fileIoWriter.checkMetadataSizeAndMayFlush();
     }
   }
 
@@ -234,7 +238,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
   @Override
   public long getWriterSize() throws IOException {
     long totalSize = 0;
-    for (TsFileIOWriter writer : targetFileWriters) {
+    for (CompactionTsFileWriter writer : targetFileWriters) {
       totalSize += writer.getPos();
     }
     return totalSize;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index 6404730d0e4..3795cf1d71b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -20,18 +20,19 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 
 public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWriter {
-  protected TsFileIOWriter fileWriter;
+  protected CompactionTsFileWriter fileWriter;
 
   protected boolean isEmptyFile;
 
@@ -50,7 +51,11 @@ public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWr
                 * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
     boolean enableMemoryControl = 
IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
     this.fileWriter =
-        new TsFileIOWriter(targetFileResource.getTsFile(), 
enableMemoryControl, sizeForFileWriter);
+        new CompactionTsFileWriter(
+            targetFileResource.getTsFile(),
+            enableMemoryControl,
+            sizeForFileWriter,
+            CompactionType.INNER_UNSEQ_COMPACTION);
     this.targetResource = targetFileResource;
     isEmptyFile = true;
   }
@@ -77,7 +82,7 @@ public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWr
   public void write(TimeValuePair timeValuePair, int subTaskId) throws 
IOException {
     writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), 
chunkWriters[subTaskId]);
     chunkPointNumArray[subTaskId]++;
-    checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], 
subTaskId, false);
+    checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], 
subTaskId);
     isEmptyFile = false;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java
index 109d66c03b3..b3464080e76 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java
@@ -150,7 +150,7 @@ public class FastCrossCompactionWriter extends 
AbstractCrossCompactionWriter {
 
     // check chunk size and may open a new chunk
     checkChunkSizeAndMayOpenANewChunk(
-        targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, 
true);
+        targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
 
     isDeviceExistedInTargetFiles[fileIndex] = true;
     isEmptyFile[fileIndex] = false;
@@ -178,7 +178,7 @@ public class FastCrossCompactionWriter extends 
AbstractCrossCompactionWriter {
 
     // check chunk size and may open a new chunk
     checkChunkSizeAndMayOpenANewChunk(
-        targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, 
true);
+        targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
 
     isDeviceExistedInTargetFiles[fileIndex] = true;
     isEmptyFile[fileIndex] = false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
index 772a5b18bd9..497afa3d32e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
@@ -57,7 +57,7 @@ public class ReadPointCrossCompactionWriter extends 
AbstractCrossCompactionWrite
     }
     chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
     checkChunkSizeAndMayOpenANewChunk(
-        targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, 
subTaskId, true);
+        targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, 
subTaskId);
     isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
     isEmptyFile[seqFileIndexArray[subTaskId]] = false;
     lastTime[subTaskId] = timestamps.getEndTime();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
index 0fdb712084d..f0dd4eef1f9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
@@ -42,7 +42,7 @@ public class ReadPointInnerCompactionWriter extends 
AbstractInnerCompactionWrite
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) 
this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
     chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
-    checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId, 
false);
+    checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId);
     isEmptyFile = false;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
new file mode 100644
index 00000000000..6e8b7b3d890
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.io;
+
+import 
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class extends the TsFileSequenceReader class to read and manage TsFile 
with a focus on
+ * compaction-related operations. This includes functions for tracking and 
recording the amount of
+ * data read and distinguishing between aligned and not aligned series during 
compaction.
+ */
+public class CompactionTsFileReader extends TsFileSequenceReader {
+  /** Tracks the total amount of data (in bytes) that has been read. */
+  private AtomicLong readDataSize = new AtomicLong(0L);
+
+  /** The type of compaction running. */
+  CompactionType compactionType;
+
+  /** A flag that indicates if an aligned series is being read. */
+  private volatile boolean readingAlignedSeries = false;
+
+  /**
+   * Constructs a new instance of CompactionTsFileReader.
+   *
+   * @param file The file to be read.
+   * @param compactionType The type of compaction running.
+   * @throws IOException If an error occurs during file operations.
+   */
+  public CompactionTsFileReader(String file, CompactionType compactionType) 
throws IOException {
+    super(file);
+    this.compactionType = compactionType;
+  }
+
+  @Override
+  protected ByteBuffer readData(long position, int totalSize) throws 
IOException {
+    ByteBuffer buffer = super.readData(position, totalSize);
+    readDataSize.addAndGet(totalSize);
+    return buffer;
+  }
+
+  /** Marks the start of reading an aligned series. */
+  public void markStartOfAlignedSeries() {
+    readingAlignedSeries = true;
+  }
+
+  /** Marks the end of reading an aligned series. */
+  public void markEndOfAlignedSeries() {
+    readingAlignedSeries = false;
+  }
+
+  @Override
+  public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
+    synchronized (this) {
+      // using synchronized to avoid concurrent read that makes readDataSize 
not correct
+      long before = readDataSize.get();
+      Chunk chunk = super.readMemChunk(metaData);
+      long dataSize = readDataSize.get() - before;
+      CompactionMetrics.getInstance()
+          .recordReadInfo(
+              compactionType,
+              readingAlignedSeries
+                  ? CompactionIoDataType.ALIGNED
+                  : CompactionIoDataType.NOT_ALIGNED,
+              dataSize);
+      return chunk;
+    }
+  }
+
+  @Override
+  public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws 
IOException {
+    long before = readDataSize.get();
+    TsFileDeviceIterator iterator = super.getAllDevicesIteratorWithIsAligned();
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
+    return iterator;
+  }
+
+  @Override
+  public List<IChunkMetadata> getChunkMetadataListByTimeseriesMetadataOffset(
+      long startOffset, long endOffset) throws IOException {
+    long before = readDataSize.get();
+    List<IChunkMetadata> chunkMetadataList =
+        super.getChunkMetadataListByTimeseriesMetadataOffset(startOffset, 
endOffset);
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
+    return chunkMetadataList;
+  }
+
+  @Override
+  public void getDevicesAndEntriesOfOneLeafNode(
+      Long startOffset, Long endOffset, Queue<Pair<String, long[]>> 
measurementNodeOffsetQueue)
+      throws IOException {
+    long before = readDataSize.get();
+    super.getDevicesAndEntriesOfOneLeafNode(startOffset, endOffset, 
measurementNodeOffsetQueue);
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
+  }
+
+  @Override
+  public MetadataIndexNode readMetadataIndexNode(long start, long end) throws 
IOException {
+    long before = readDataSize.get();
+    MetadataIndexNode metadataIndexNode = super.readMetadataIndexNode(start, 
end);
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
+    return metadataIndexNode;
+  }
+
+  @Override
+  public Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
+      getTimeseriesMetadataOffsetByDevice(
+          MetadataIndexNode measurementNode,
+          Set<String> excludedMeasurementIds,
+          boolean needChunkMetadata)
+          throws IOException {
+    long before = readDataSize.get();
+    Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> result =
+        super.getTimeseriesMetadataOffsetByDevice(
+            measurementNode, excludedMeasurementIds, needChunkMetadata);
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
+    return result;
+  }
+
+  @Override
+  public void getDeviceTimeseriesMetadata(
+      List<TimeseriesMetadata> timeseriesMetadataList,
+      MetadataIndexNode measurementNode,
+      Set<String> excludedMeasurementIds,
+      boolean needChunkMetadata)
+      throws IOException {
+    long before = readDataSize.get();
+    super.getDeviceTimeseriesMetadata(
+        timeseriesMetadataList, measurementNode, excludedMeasurementIds, 
needChunkMetadata);
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
new file mode 100644
index 00000000000..4dac7843aae
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.io;
+
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import 
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+public class CompactionTsFileWriter extends TsFileIOWriter {
+  CompactionType type;
+
+  private volatile boolean isWritingAligned = false;
+
+  public CompactionTsFileWriter(
+      File file, boolean enableMemoryControl, long maxMetadataSize, 
CompactionType type)
+      throws IOException {
+    super(file, enableMemoryControl, maxMetadataSize);
+    this.type = type;
+  }
+
+  public void markStartingWritingAligned() {
+    isWritingAligned = true;
+  }
+
+  public void markEndingWritingAligned() {
+    isWritingAligned = false;
+  }
+
+  public void writeChunk(IChunkWriter chunkWriter) throws IOException {
+    boolean isAligned = chunkWriter instanceof AlignedChunkWriterImpl;
+    long beforeOffset = this.getPos();
+    chunkWriter.writeToFileWriter(this);
+    long writtenDataSize = this.getPos() - beforeOffset;
+    if (writtenDataSize > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) 
writtenDataSize);
+    }
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(
+            type,
+            isAligned ? CompactionIoDataType.ALIGNED : 
CompactionIoDataType.NOT_ALIGNED,
+            writtenDataSize);
+  }
+
+  @Override
+  public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws 
IOException {
+    long beforeOffset = this.getPos();
+    super.writeChunk(chunk, chunkMetadata);
+    long writtenDataSize = this.getPos() - beforeOffset;
+    if (writtenDataSize > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) 
writtenDataSize);
+    }
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(
+            type,
+            isWritingAligned ? CompactionIoDataType.ALIGNED : 
CompactionIoDataType.NOT_ALIGNED,
+            writtenDataSize);
+  }
+
+  @Override
+  public void writeEmptyValueChunk(
+      String measurementId,
+      CompressionType compressionType,
+      TSDataType tsDataType,
+      TSEncoding encodingType,
+      Statistics<? extends Serializable> statistics)
+      throws IOException {
+    long beforeOffset = this.getPos();
+    super.writeEmptyValueChunk(
+        measurementId, compressionType, tsDataType, encodingType, statistics);
+    long writtenDataSize = this.getPos() - beforeOffset;
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize);
+    if (writtenDataSize > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) 
writtenDataSize);
+    }
+  }
+
+  @Override
+  public int checkMetadataSizeAndMayFlush() throws IOException {
+    int size = super.checkMetadataSizeAndMayFlush();
+    if (size > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(size);
+    }
+    CompactionMetrics.getInstance().recordWriteInfo(type, 
CompactionIoDataType.METADATA, size);
+    return size;
+  }
+
+  @Override
+  public void endFile() throws IOException {
+    long beforeSize = this.getPos();
+    super.endFile();
+    long writtenDataSize = this.getPos() - beforeSize;
+    if (writtenDataSize > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) 
writtenDataSize);
+    }
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 6dd13065941..baad46f2a0d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -252,16 +252,6 @@ public class CompactionTaskManager implements IService {
       mergeWriteRateLimiter.setRate(throughout);
     }
   }
-  /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
-  public static void mergeRateLimiterAcquire(RateLimiter limiter, long 
bytesLength) {
-    while (bytesLength >= Integer.MAX_VALUE) {
-      limiter.acquire(Integer.MAX_VALUE);
-      bytesLength -= Integer.MAX_VALUE;
-    }
-    if (bytesLength > 0) {
-      limiter.acquire((int) bytesLength);
-    }
-  }
 
   public synchronized void removeRunningTaskFuture(AbstractCompactionTask 
task) {
     String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), 
task.getDataRegionId());
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java
similarity index 74%
copy from 
node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
copy to 
server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java
index f0d61466497..be5f76eb290 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/DataNodeThreadModule.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java
@@ -17,23 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.concurrent;
+package org.apache.iotdb.db.engine.compaction.schedule.constant;
 
-public enum DataNodeThreadModule {
-  QUERY,
-  MPP,
-  COMPACTION,
-  WAL,
-  FLUSH,
-  SCHEMA_ENGINE,
-  CLIENT_SERVICE,
-  IOT_CONSENSUS,
-  RATIS_CONSENSUS,
-  COMPUTE,
-  SYNC,
-  JVM,
-  LOG_BACK,
-  METRICS,
-  OTHER,
-  UNKNOWN
+public enum CompactionIoDataType {
+  NOT_ALIGNED(0),
+  ALIGNED(1),
+  METADATA(2);
+
+  int value;
+
+  CompactionIoDataType(int value) {
+    this.value = value;
+  }
+
+  public int getValue() {
+    return value;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index de9a822facf..468619f8a69 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1550,6 +1550,11 @@ public class DataRegion implements IDataRegionForQuery {
       tsFileResourceList.addAll(tsFileManager.getTsFileList(false));
       tsFileResourceList.forEach(
           x -> {
+            FileMetrics.getInstance()
+                .deleteFile(
+                    new long[] {x.getTsFileSize()},
+                    x.isSeq(),
+                    Collections.singletonList(x.getTsFile().getName()));
             if (x.getModFile().exists()) {
               FileMetrics.getInstance().decreaseModFileNum(1);
               
FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
index d482be8c9ec..f80ff96e664 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
@@ -25,8 +25,8 @@ import 
org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskType;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import 
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import 
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
@@ -44,7 +44,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class CompactionMetrics implements IMetricSet {
-  private static final List<String> TYPES = Arrays.asList("aligned", 
"not_aligned");
+  private static final String NOT_ALIGNED = "not_aligned";
+  private static final String ALIGNED = "aligned";
+  private static final String METADATA = "metadata";
+  private static final List<String> TYPES = Arrays.asList(ALIGNED, 
NOT_ALIGNED);
   private static final CompactionMetrics INSTANCE = new CompactionMetrics();
   private long lastUpdateTime = 0L;
   private static final long UPDATE_INTERVAL = 10_000L;
@@ -57,47 +60,58 @@ public class CompactionMetrics implements IMetricSet {
   private final AtomicInteger finishSeqInnerCompactionTaskNum = new 
AtomicInteger(0);
   private final AtomicInteger finishUnseqInnerCompactionTaskNum = new 
AtomicInteger(0);
   private final AtomicInteger finishCrossCompactionTaskNum = new 
AtomicInteger(0);
+  // compaction type -> Counter[ Not-Aligned, Aligned, Metadata]
+  private final Map<String, Counter[]> writeCounters = new 
ConcurrentHashMap<>();
+  private final Map<String, Counter[]> readCounters = new 
ConcurrentHashMap<>();
 
   private CompactionMetrics() {
     for (String type : TYPES) {
-      Map<CompactionType, Map<ProcessChunkType, Counter>> 
compactionTypeProcessChunkTypeMap =
-          writeInfoCounterMap.computeIfAbsent(type, k -> new 
ConcurrentHashMap<>());
-      for (CompactionType compactionType : CompactionType.values()) {
-        Map<ProcessChunkType, Counter> counterMap =
-            compactionTypeProcessChunkTypeMap.computeIfAbsent(
-                compactionType, k -> new ConcurrentHashMap<>());
-        for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
-          counterMap.put(processChunkType, 
DoNothingMetricManager.DO_NOTHING_COUNTER);
-        }
-      }
+      readCounters.put(
+          type,
+          new Counter[] {
+            DoNothingMetricManager.DO_NOTHING_COUNTER,
+            DoNothingMetricManager.DO_NOTHING_COUNTER,
+            DoNothingMetricManager.DO_NOTHING_COUNTER
+          });
+      writeCounters.put(
+          type,
+          new Counter[] {
+            DoNothingMetricManager.DO_NOTHING_COUNTER,
+            DoNothingMetricManager.DO_NOTHING_COUNTER,
+            DoNothingMetricManager.DO_NOTHING_COUNTER
+          });
     }
   }
 
-  // region compaction write info
-  private Map<String, Map<CompactionType, Map<ProcessChunkType, Counter>>> 
writeInfoCounterMap =
-      new ConcurrentHashMap<>();
   private Counter totalCompactionWriteInfoCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
 
   private void bindWriteInfo(AbstractMetricService metricService) {
-    for (String type : TYPES) {
-      Map<CompactionType, Map<ProcessChunkType, Counter>> 
compactionTypeProcessChunkTypeMap =
-          writeInfoCounterMap.computeIfAbsent(type, k -> new 
ConcurrentHashMap<>());
-      for (CompactionType compactionType : CompactionType.values()) {
-        Map<ProcessChunkType, Counter> counterMap =
-            compactionTypeProcessChunkTypeMap.computeIfAbsent(
-                compactionType, k -> new ConcurrentHashMap<>());
-        for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
-          Counter counter =
-              metricService.getOrCreateCounter(
-                  Metric.DATA_WRITTEN.toString(),
-                  MetricLevel.IMPORTANT,
-                  Tag.NAME.toString(),
-                  "compaction_" + compactionType.toString(),
-                  Tag.STATUS.toString(),
-                  type + "_" + processChunkType.toString());
-          counterMap.put(processChunkType, counter);
-        }
-      }
+    for (CompactionType compactionType : CompactionType.values()) {
+      writeCounters.put(
+          compactionType.toString(),
+          new Counter[] {
+            metricService.getOrCreateCounter(
+                Metric.DATA_WRITTEN.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                NOT_ALIGNED),
+            metricService.getOrCreateCounter(
+                Metric.DATA_WRITTEN.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                ALIGNED),
+            metricService.getOrCreateCounter(
+                Metric.DATA_WRITTEN.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                METADATA)
+          });
     }
     totalCompactionWriteInfoCounter =
         metricService.getOrCreateCounter(
@@ -110,22 +124,28 @@ public class CompactionMetrics implements IMetricSet {
   }
 
   private void unbindWriteInfo(AbstractMetricService metricService) {
-    for (String type : TYPES) {
-      for (CompactionType compactionType : CompactionType.values()) {
-        for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
-          metricService.remove(
-              MetricType.COUNTER,
-              Metric.DATA_WRITTEN.toString(),
-              Tag.NAME.toString(),
-              "compaction_" + compactionType.toString(),
-              Tag.STATUS.toString(),
-              type + "_" + processChunkType.toString());
-          writeInfoCounterMap
-              .get(type)
-              .get(compactionType)
-              .put(processChunkType, 
DoNothingMetricManager.DO_NOTHING_COUNTER);
-        }
-      }
+    for (CompactionType compactionType : CompactionType.values()) {
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_WRITTEN.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          NOT_ALIGNED);
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_WRITTEN.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          ALIGNED);
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_WRITTEN.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          METADATA);
     }
     metricService.remove(
         MetricType.COUNTER,
@@ -137,13 +157,12 @@ public class CompactionMetrics implements IMetricSet {
   }
 
   public void recordWriteInfo(
-      CompactionType compactionType,
-      ProcessChunkType processChunkType,
-      boolean aligned,
-      long byteNum) {
-    String type = aligned ? "aligned" : "not_aligned";
-    
writeInfoCounterMap.get(type).get(compactionType).get(processChunkType).inc(byteNum
 / 1024L);
-    totalCompactionWriteInfoCounter.inc(byteNum / 1024L);
+      CompactionType compactionType, CompactionIoDataType dataType, long 
byteNum) {
+    Counter[] counters = writeCounters.get(compactionType.toString());
+    if (counters != null) {
+      counters[dataType.getValue()].inc(byteNum);
+    }
+    totalCompactionWriteInfoCounter.inc(byteNum);
   }
 
   // endregion
@@ -152,18 +171,73 @@ public class CompactionMetrics implements IMetricSet {
   private Counter totalCompactionReadInfoCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
 
   private void bindReadInfo(AbstractMetricService metricService) {
+    for (CompactionType compactionType : CompactionType.values()) {
+      readCounters.put(
+          compactionType.toString(),
+          new Counter[] {
+            metricService.getOrCreateCounter(
+                Metric.DATA_READ.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                NOT_ALIGNED),
+            metricService.getOrCreateCounter(
+                Metric.DATA_READ.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                ALIGNED),
+            metricService.getOrCreateCounter(
+                Metric.DATA_READ.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                METADATA)
+          });
+    }
     totalCompactionReadInfoCounter =
         metricService.getOrCreateCounter(
             Metric.DATA_READ.toString(), MetricLevel.IMPORTANT, 
Tag.NAME.toString(), "compaction");
   }
 
   private void unbindReadInfo(AbstractMetricService metricService) {
+    for (CompactionType compactionType : CompactionType.values()) {
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_READ.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          NOT_ALIGNED);
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_READ.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          ALIGNED);
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_READ.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          METADATA);
+    }
     metricService.remove(
         MetricType.COUNTER, Metric.DATA_READ.toString(), Tag.NAME.toString(), 
"compaction");
   }
 
-  public void recordReadInfo(long byteNum) {
-    totalCompactionReadInfoCounter.inc(byteNum / 1024L);
+  public void recordReadInfo(
+      CompactionType compactionType, CompactionIoDataType dataType, long 
byteNum) {
+    Counter[] counters = readCounters.get(compactionType.toString());
+    if (counters != null) {
+      counters[dataType.getValue()].inc(byteNum);
+    }
+    totalCompactionReadInfoCounter.inc(byteNum);
   }
   // endregion
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
index 1b82f2a512d..153d0388c2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
@@ -156,6 +156,7 @@ public class FileMetrics implements IMetricSet {
         FileMetrics::getModFileNum,
         Tag.NAME.toString(),
         "mods");
+    checkIfThereRemainingData();
   }
 
   private void bindWalFileMetrics(AbstractMetricService metricService) {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 76609747c37..075c8da0eae 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -82,8 +82,7 @@ public class TsFileDeviceIterator implements 
Iterator<Pair<String, Boolean>> {
     try {
       // get the first measurement node of this device, to know if the device 
is aligned
       this.measurementNode =
-          MetadataIndexNode.deserializeFrom(
-              reader.readData(startEndPair.right[0], startEndPair.right[1]));
+          reader.readMetadataIndexNode(startEndPair.right[0], 
startEndPair.right[1]);
       boolean isAligned = reader.isAlignedDevice(measurementNode);
       currentDevice = new Pair<>(startEndPair.left, isAligned);
       return currentDevice;
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 8e760edaef4..8a8516f7f0b 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -2250,6 +2250,18 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     }
   }
 
+  /**
+   * Read MetadataIndexNode by start and end offset.
+   *
+   * @param start the start offset of the MetadataIndexNode
+   * @param end the end offset of the MetadataIndexNode
+   * @return MetadataIndexNode
+   * @throws IOException IOException
+   */
+  public MetadataIndexNode readMetadataIndexNode(long start, long end) throws 
IOException {
+    return MetadataIndexNode.deserializeFrom(readData(start, end));
+  }
+
   @Override
   public int hashCode() {
     return file.hashCode();
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index e62aeb40c7e..ab4ea6198f9 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -631,7 +631,7 @@ public class TsFileIOWriter implements AutoCloseable {
    *
    * @throws IOException
    */
-  public void checkMetadataSizeAndMayFlush() throws IOException {
+  public int checkMetadataSizeAndMayFlush() throws IOException {
     // This function should be called after all data of an aligned device has 
been written
     if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
       try {
@@ -642,11 +642,13 @@ public class TsFileIOWriter implements AutoCloseable {
               chunkMetadataCount,
               currentChunkMetadataSize / chunkMetadataCount);
         }
-        sortAndFlushChunkMetadata();
+        return sortAndFlushChunkMetadata();
       } catch (IOException e) {
         logger.error("Meets exception when flushing metadata to temp file for 
{}", file, e);
         throw e;
       }
+    } else {
+      return 0;
     }
   }
 
@@ -656,7 +658,8 @@ public class TsFileIOWriter implements AutoCloseable {
    *
    * @throws IOException
    */
-  protected void sortAndFlushChunkMetadata() throws IOException {
+  protected int sortAndFlushChunkMetadata() throws IOException {
+    int writtenSize = 0;
     // group by series
     List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
         TSMIterator.sortChunkMetadata(
@@ -673,7 +676,7 @@ public class TsFileIOWriter implements AutoCloseable {
         pathCount++;
       }
       List<IChunkMetadata> iChunkMetadataList = pair.right;
-      writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+      writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, 
seriesPath, isNewPath);
       lastSerializePath = seriesPath;
       logger.debug("Flushing {}", seriesPath);
     }
@@ -684,11 +687,13 @@ public class TsFileIOWriter implements AutoCloseable {
     }
     chunkMetadataCount = 0;
     currentChunkMetadataSize = 0;
+    return writtenSize;
   }
 
-  private void writeChunkMetadataToTempFile(
+  private int writeChunkMetadataToTempFile(
       List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean 
isNewPath)
       throws IOException {
+    int writtenSize = 0;
     // [DeviceId] measurementId datatype size chunkMetadataBuffer
     if (lastSerializePath == null
         || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
@@ -696,20 +701,25 @@ public class TsFileIOWriter implements AutoCloseable {
       endPosInCMTForDevice.add(tempOutput.getPosition());
       // serialize the device
       // for each device, we only serialize it once, in order to save io
-      ReadWriteIOUtils.write(seriesPath.getDevice(), 
tempOutput.wrapAsStream());
+      writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(), 
tempOutput.wrapAsStream());
     }
     if (isNewPath && iChunkMetadataList.size() > 0) {
       // serialize the public info of this measurement
-      ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), 
tempOutput.wrapAsStream());
-      ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), 
tempOutput.wrapAsStream());
+      writtenSize +=
+          ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), 
tempOutput.wrapAsStream());
+      writtenSize +=
+          ReadWriteIOUtils.write(
+              iChunkMetadataList.get(0).getDataType(), 
tempOutput.wrapAsStream());
     }
     PublicBAOS buffer = new PublicBAOS();
     int totalSize = 0;
     for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
       totalSize += chunkMetadata.serializeTo(buffer, true);
     }
-    ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+    writtenSize += ReadWriteIOUtils.write(totalSize, 
tempOutput.wrapAsStream());
     buffer.writeTo(tempOutput);
+    writtenSize += buffer.size();
+    return writtenSize;
   }
 
   public String getCurrentChunkGroupDeviceId() {

Reply via email to