This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch refactor-compaction-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5e0ac213b59c5272a9fec402eb74c667ba951086
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jun 6 15:45:20 2023 +0800

    refactor the write process of compaction
---
 .../impl/ReadChunkCompactionPerformer.java         | 15 +++--
 .../readchunk/AlignedSeriesCompactionExecutor.java | 20 ++----
 .../readchunk/SingleSeriesCompactionExecutor.java  | 24 ++-----
 .../utils/writer/AbstractCompactionWriter.java     |  3 +-
 .../compaction/io/CompactionTsFileReader.java      | 30 +++++++++
 .../compaction/io/CompactionTsFileWriter.java      | 77 ++++++++++++++++++++++
 .../schedule/constant/WrittenDataType.java         | 36 ++++++++++
 .../compaction/utils/CompactionTsFileWriter.java   | 46 -------------
 .../db/service/metrics/CompactionMetrics.java      | 48 ++++++++++++--
 9 files changed, 207 insertions(+), 92 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index da0c9c91dd8..fff1aec7e6e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -27,6 +27,8 @@ import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.MultiTsFileDeviceIterator;
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.AlignedSeriesCompactionExecutor;
 import 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.rescon.SystemInfo;
@@ -34,7 +36,6 @@ import 
org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,8 +72,12 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
                 / 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
                 * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
     try (MultiTsFileDeviceIterator deviceIterator = new 
MultiTsFileDeviceIterator(seqFiles);
-        TsFileIOWriter writer =
-            new TsFileIOWriter(targetResource.getTsFile(), true, 
sizeForFileWriter)) {
+        CompactionTsFileWriter writer =
+            new CompactionTsFileWriter(
+                targetResource.getTsFile(),
+                true,
+                sizeForFileWriter,
+                CompactionType.INNER_SEQ_COMPACTION)) {
       while (deviceIterator.hasNextDevice()) {
         Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
         String device = deviceInfo.left;
@@ -113,7 +118,7 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
   private void compactAlignedSeries(
       String device,
       TsFileResource targetResource,
-      TsFileIOWriter writer,
+      CompactionTsFileWriter writer,
       MultiTsFileDeviceIterator deviceIterator)
       throws IOException, InterruptedException {
     checkThreadInterrupted();
@@ -153,7 +158,7 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
   private void compactNotAlignedSeries(
       String device,
       TsFileResource targetResource,
-      TsFileIOWriter writer,
+      CompactionTsFileWriter writer,
       MultiTsFileDeviceIterator deviceIterator)
       throws IOException, MetadataException, InterruptedException {
     writer.startChunkGroup(device);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index c529ca05208..a4df7677b29 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -20,6 +20,7 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -38,7 +39,6 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import com.google.common.util.concurrent.RateLimiter;
 
@@ -55,7 +55,7 @@ public class AlignedSeriesCompactionExecutor {
   private final LinkedList<Pair<TsFileSequenceReader, 
List<AlignedChunkMetadata>>>
       readerAndChunkMetadataList;
   private final TsFileResource targetResource;
-  private final TsFileIOWriter writer;
+  private final CompactionTsFileWriter writer;
 
   private final AlignedChunkWriterImpl chunkWriter;
   private final List<IMeasurementSchema> schemaList;
@@ -73,7 +73,7 @@ public class AlignedSeriesCompactionExecutor {
       String device,
       TsFileResource targetResource,
       LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> 
readerAndChunkMetadataList,
-      TsFileIOWriter writer,
+      CompactionTsFileWriter writer,
       CompactionTaskSummary summary)
       throws IOException {
     this.device = device;
@@ -150,12 +150,7 @@ public class AlignedSeriesCompactionExecutor {
     }
 
     if (remainingPointInChunkWriter != 0L) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetrics.getInstance()
-          .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION, true, 
chunkWriter.estimateMaxSeriesMemSize());
-      chunkWriter.writeToFileWriter(writer);
+      writer.writeChunk(chunkWriter);
     }
     writer.checkMetadataSizeAndMayFlush();
   }
@@ -189,12 +184,7 @@ public class AlignedSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (remainingPointInChunkWriter >= chunkPointNumThreshold
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * 
schemaList.size()) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetrics.getInstance()
-          .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION, true, 
chunkWriter.estimateMaxSeriesMemSize());
-      chunkWriter.writeToFileWriter(writer);
+      writer.writeChunk(chunkWriter);
       remainingPointInChunkWriter = 0L;
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index 11a68cdecf0..454564f3643 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -38,7 +39,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import com.google.common.util.concurrent.RateLimiter;
 
@@ -51,7 +51,7 @@ public class SingleSeriesCompactionExecutor {
   private String device;
   private PartialPath series;
   private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList;
-  private TsFileIOWriter fileWriter;
+  private CompactionTsFileWriter fileWriter;
   private TsFileResource targetResource;
 
   private IMeasurementSchema schema;
@@ -79,7 +79,7 @@ public class SingleSeriesCompactionExecutor {
       PartialPath series,
       IMeasurementSchema measurementSchema,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
-      TsFileIOWriter fileWriter,
+      CompactionTsFileWriter fileWriter,
       TsFileResource targetResource) {
     this.device = series.getDevice();
     this.series = series;
@@ -96,7 +96,7 @@ public class SingleSeriesCompactionExecutor {
   public SingleSeriesCompactionExecutor(
       PartialPath series,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
-      TsFileIOWriter fileWriter,
+      CompactionTsFileWriter fileWriter,
       TsFileResource targetResource,
       CompactionTaskSummary summary) {
     this.device = series.getDevice();
@@ -337,20 +337,13 @@ public class SingleSeriesCompactionExecutor {
     if (chunkMetadata.getEndTime() > maxEndTimestamp) {
       maxEndTimestamp = chunkMetadata.getEndTime();
     }
-    CompactionMetrics.getInstance()
-        .recordWriteInfo(CompactionType.INNER_SEQ_COMPACTION, false, 
getChunkSize(chunk));
     fileWriter.writeChunk(chunk, chunkMetadata);
   }
 
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (pointCountInChunkWriter >= targetChunkPointNum
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-      CompactionMetrics.getInstance()
-          .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION, false, 
chunkWriter.estimateMaxSeriesMemSize());
-      chunkWriter.writeToFileWriter(fileWriter);
+      fileWriter.writeChunk(chunkWriter);
       pointCountInChunkWriter = 0L;
     }
   }
@@ -365,12 +358,7 @@ public class SingleSeriesCompactionExecutor {
   }
 
   private void flushChunkWriter() throws IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(
-        compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
-    CompactionMetrics.getInstance()
-        .recordWriteInfo(
-            CompactionType.INNER_SEQ_COMPACTION, false, 
chunkWriter.estimateMaxSeriesMemSize());
-    chunkWriter.writeToFileWriter(fileWriter);
+    fileWriter.writeChunk(chunkWriter);
     pointCountInChunkWriter = 0L;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 37b4c731226..9a26c4de327 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.PageException;
@@ -304,7 +305,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
                 isCrossSpace
                     ? CompactionType.CROSS_COMPACTION
                     : CompactionType.INNER_UNSEQ_COMPACTION,
-                isAlign,
+                isAlign ? WrittenDataType.ALIGNED : 
WrittenDataType.NOT_ALIGNED,
                 iChunkWriter.estimateMaxSeriesMemSize());
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
new file mode 100644
index 00000000000..7c9e06789d3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.io;
+
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+
+import java.io.IOException;
+
+public class CompactionTsFileReader extends TsFileSequenceReader {
+  public CompactionTsFileReader(String file) throws IOException {
+    super(file);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
new file mode 100644
index 00000000000..62f3b65a0a2
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.io;
+
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType;
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.File;
+import java.io.IOException;
+
+public class CompactionTsFileWriter extends TsFileIOWriter {
+  CompactionType type;
+
+  public CompactionTsFileWriter(
+      File file, boolean enableMemoryControl, long maxMetadataSize, 
CompactionType type)
+      throws IOException {
+    super(file, enableMemoryControl, maxMetadataSize);
+    this.type = type;
+  }
+
+  @Override
+  public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws 
IOException {
+    long beforeOffset = this.getPos();
+    CompactionTaskManager.getInstance()
+        .getMergeWriteRateLimiter()
+        .acquire(chunk.getHeader().getDataSize() + 
chunk.getHeader().getSerializedSize());
+    super.writeChunk(chunk, chunkMetadata);
+    long writtenDataSize = this.getPos() - beforeOffset;
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(type, WrittenDataType.NOT_ALIGNED, writtenDataSize);
+  }
+
+  public void writeChunk(IChunkWriter chunkWriter) throws IOException {
+    boolean isAligned = chunkWriter instanceof AlignedChunkWriterImpl;
+    long beforeOffset = this.getPos();
+    chunkWriter.writeToFileWriter(this);
+    long writtenDataSize = this.getPos() - beforeOffset;
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(
+            type,
+            isAligned ? WrittenDataType.ALIGNED : WrittenDataType.NOT_ALIGNED,
+            writtenDataSize);
+  }
+
+  @Override
+  public void endFile() throws IOException {
+    long beforeSize = this.getPos();
+    super.endFile();
+    long writtenDataSize = this.getPos() - beforeSize;
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(type, WrittenDataType.METADATA, writtenDataSize);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java
new file mode 100644
index 00000000000..6c522026bc2
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.schedule.constant;
+
+public enum WrittenDataType {
+  NOT_ALIGNED(0),
+  ALIGNED(1),
+  METADATA(2);
+
+  int value;
+
+  WrittenDataType(int value) {
+    this.value = value;
+  }
+
+  public int getValue() {
+    return value;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java
deleted file mode 100644
index eaa3dd3d0d0..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.compaction.utils;
-
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import java.io.File;
-import java.io.IOException;
-
-public class CompactionTsFileWriter extends TsFileIOWriter {
-  public CompactionTsFileWriter(File file, boolean enableMemoryControl, long 
maxMetadataSize)
-      throws IOException {
-    super(file, enableMemoryControl, maxMetadataSize);
-  }
-
-  public void writeChunk(
-      CompactionType type, Chunk chunk, ChunkMetadata chunkMetadata, boolean 
aligned)
-      throws IOException {
-    int dataSize = chunk.getHeader().getSerializedSize() + 
chunk.getHeader().getDataSize();
-    
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(dataSize);
-    super.writeChunk(chunk, chunkMetadata);
-    CompactionMetrics.getInstance().recordWriteInfo(type, aligned, dataSize);
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
index 082ea55b956..9123306137f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import 
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
@@ -57,7 +58,7 @@ public class CompactionMetrics implements IMetricSet {
   private final AtomicInteger finishSeqInnerCompactionTaskNum = new 
AtomicInteger(0);
   private final AtomicInteger finishUnseqInnerCompactionTaskNum = new 
AtomicInteger(0);
   private final AtomicInteger finishCrossCompactionTaskNum = new 
AtomicInteger(0);
-  // compaction type -> Counter[ Not-Aligned, Aligned]
+  // compaction type -> Counter[ Not-Aligned, Aligned, Metadata]
   private final Map<String, Counter[]> writeCounters = new 
ConcurrentHashMap<>();
   private final Map<String, Counter[]> readCounters = new 
ConcurrentHashMap<>();
 
@@ -66,12 +67,16 @@ public class CompactionMetrics implements IMetricSet {
       readCounters.put(
           type,
           new Counter[] {
-            DoNothingMetricManager.DO_NOTHING_COUNTER, 
DoNothingMetricManager.DO_NOTHING_COUNTER
+            DoNothingMetricManager.DO_NOTHING_COUNTER,
+            DoNothingMetricManager.DO_NOTHING_COUNTER,
+            DoNothingMetricManager.DO_NOTHING_COUNTER
           });
       writeCounters.put(
           type,
           new Counter[] {
-            DoNothingMetricManager.DO_NOTHING_COUNTER, 
DoNothingMetricManager.DO_NOTHING_COUNTER
+            DoNothingMetricManager.DO_NOTHING_COUNTER,
+            DoNothingMetricManager.DO_NOTHING_COUNTER,
+            DoNothingMetricManager.DO_NOTHING_COUNTER
           });
     }
   }
@@ -99,7 +104,14 @@ public class CompactionMetrics implements IMetricSet {
                 Tag.TYPE.toString(),
                 compactionType.toString(),
                 Tag.NAME.toString(),
-                "aligned")
+                "aligned"),
+            metricService.getOrCreateCounter(
+                Metric.DATA_WRITTEN.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                "metadata")
           });
     }
     totalCompactionWriteInfoCounter =
@@ -128,6 +140,13 @@ public class CompactionMetrics implements IMetricSet {
           compactionType.toString(),
           Tag.NAME.toString(),
           "aligned");
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_WRITTEN.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          "metadata");
     }
     metricService.remove(
         MetricType.COUNTER,
@@ -138,9 +157,10 @@ public class CompactionMetrics implements IMetricSet {
         "total");
   }
 
-  public void recordWriteInfo(CompactionType compactionType, boolean aligned, 
long byteNum) {
+  public void recordWriteInfo(
+      CompactionType compactionType, WrittenDataType dataType, long byteNum) {
     Counter[] counters = writeCounters.get(compactionType.toString());
-    counters[aligned ? 1 : 0].inc(byteNum);
+    counters[dataType.getValue()].inc(byteNum);
     totalCompactionWriteInfoCounter.inc(byteNum);
   }
 
@@ -167,7 +187,14 @@ public class CompactionMetrics implements IMetricSet {
                 Tag.TYPE.toString(),
                 compactionType.toString(),
                 Tag.NAME.toString(),
-                "aligned")
+                "aligned"),
+            metricService.getOrCreateCounter(
+                Metric.DATA_READ.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                "metadata")
           });
     }
     totalCompactionReadInfoCounter =
@@ -191,6 +218,13 @@ public class CompactionMetrics implements IMetricSet {
           compactionType.toString(),
           Tag.NAME.toString(),
           "aligned");
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_READ.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          "metadata");
     }
     metricService.remove(
         MetricType.COUNTER, Metric.DATA_READ.toString(), Tag.NAME.toString(), 
"compaction");

Reply via email to