This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch refactor-compaction-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4ca4219d2c83ab2fed785b0dfcc17a21611dd64f
Author: Liu Xuxin <[email protected]>
AuthorDate: Mon Jun 5 21:04:52 2023 +0800

    refactor compaction metrics
---
 .../readchunk/AlignedSeriesCompactionExecutor.java |  15 +--
 .../readchunk/SingleSeriesCompactionExecutor.java  |  19 +--
 .../utils/writer/AbstractCompactionWriter.java     |   2 -
 .../compaction/utils/CompactionTsFileWriter.java   |  46 +++++++
 .../db/service/metrics/CompactionMetrics.java      | 146 +++++++++++++--------
 5 files changed, 147 insertions(+), 81 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index 6690d133ac8..c529ca05208 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import 
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -142,7 +141,9 @@ public class AlignedSeriesCompactionExecutor {
             readerIterator.nextReader();
         
summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum());
         
summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum());
-        
CompactionMetrics.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize());
+        CompactionMetrics.getInstance()
+            .recordReadInfo(
+                CompactionType.INNER_SEQ_COMPACTION, true, 
nextAlignedChunkInfo.getTotalSize());
         compactOneAlignedChunk(
             nextAlignedChunkInfo.getReader(), 
nextAlignedChunkInfo.getNotNullChunkNum());
       }
@@ -153,10 +154,7 @@ public class AlignedSeriesCompactionExecutor {
           rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetrics.getInstance()
           .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION,
-              ProcessChunkType.DESERIALIZE_CHUNK,
-              true,
-              chunkWriter.estimateMaxSeriesMemSize());
+              CompactionType.INNER_SEQ_COMPACTION, true, 
chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(writer);
     }
     writer.checkMetadataSizeAndMayFlush();
@@ -195,10 +193,7 @@ public class AlignedSeriesCompactionExecutor {
           rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetrics.getInstance()
           .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION,
-              ProcessChunkType.DESERIALIZE_CHUNK,
-              true,
-              chunkWriter.estimateMaxSeriesMemSize());
+              CompactionType.INNER_SEQ_COMPACTION, true, 
chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(writer);
       remainingPointInChunkWriter = 0L;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index 47d80ccf4ea..11a68cdecf0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import 
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -131,6 +130,8 @@ public class SingleSeriesCompactionExecutor {
         }
         CompactionMetrics.getInstance()
             .recordReadInfo(
+                CompactionType.INNER_SEQ_COMPACTION,
+                false,
                 (long) currentChunk.getHeader().getSerializedSize()
                     + currentChunk.getHeader().getDataSize());
 
@@ -337,11 +338,7 @@ public class SingleSeriesCompactionExecutor {
       maxEndTimestamp = chunkMetadata.getEndTime();
     }
     CompactionMetrics.getInstance()
-        .recordWriteInfo(
-            CompactionType.INNER_SEQ_COMPACTION,
-            isCachedChunk ? ProcessChunkType.MERGE_CHUNK : 
ProcessChunkType.FLUSH_CHUNK,
-            false,
-            getChunkSize(chunk));
+        .recordWriteInfo(CompactionType.INNER_SEQ_COMPACTION, false, 
getChunkSize(chunk));
     fileWriter.writeChunk(chunk, chunkMetadata);
   }
 
@@ -352,10 +349,7 @@ public class SingleSeriesCompactionExecutor {
           compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
       CompactionMetrics.getInstance()
           .recordWriteInfo(
-              CompactionType.INNER_SEQ_COMPACTION,
-              ProcessChunkType.DESERIALIZE_CHUNK,
-              false,
-              chunkWriter.estimateMaxSeriesMemSize());
+              CompactionType.INNER_SEQ_COMPACTION, false, 
chunkWriter.estimateMaxSeriesMemSize());
       chunkWriter.writeToFileWriter(fileWriter);
       pointCountInChunkWriter = 0L;
     }
@@ -375,10 +369,7 @@ public class SingleSeriesCompactionExecutor {
         compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
     CompactionMetrics.getInstance()
         .recordWriteInfo(
-            CompactionType.INNER_SEQ_COMPACTION,
-            ProcessChunkType.DESERIALIZE_CHUNK,
-            false,
-            chunkWriter.estimateMaxSeriesMemSize());
+            CompactionType.INNER_SEQ_COMPACTION, false, 
chunkWriter.estimateMaxSeriesMemSize());
     chunkWriter.writeToFileWriter(fileWriter);
     pointCountInChunkWriter = 0L;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 1f4b13b9a14..37b4c731226 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -21,7 +21,6 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import 
org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.PageException;
@@ -305,7 +304,6 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
                 isCrossSpace
                     ? CompactionType.CROSS_COMPACTION
                     : CompactionType.INNER_UNSEQ_COMPACTION,
-                ProcessChunkType.DESERIALIZE_CHUNK,
                 isAlign,
                 iChunkWriter.estimateMaxSeriesMemSize());
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java
new file mode 100644
index 00000000000..eaa3dd3d0d0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.utils;
+
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.File;
+import java.io.IOException;
+
+public class CompactionTsFileWriter extends TsFileIOWriter {
+  public CompactionTsFileWriter(File file, boolean enableMemoryControl, long 
maxMetadataSize)
+      throws IOException {
+    super(file, enableMemoryControl, maxMetadataSize);
+  }
+
+  public void writeChunk(
+      CompactionType type, Chunk chunk, ChunkMetadata chunkMetadata, boolean 
aligned)
+      throws IOException {
+    int dataSize = chunk.getHeader().getSerializedSize() + 
chunk.getHeader().getDataSize();
+    
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(dataSize);
+    super.writeChunk(chunk, chunkMetadata);
+    CompactionMetrics.getInstance().recordWriteInfo(type, aligned, dataSize);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
index d482be8c9ec..082ea55b956 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
@@ -57,19 +57,22 @@ public class CompactionMetrics implements IMetricSet {
   private final AtomicInteger finishSeqInnerCompactionTaskNum = new 
AtomicInteger(0);
   private final AtomicInteger finishUnseqInnerCompactionTaskNum = new 
AtomicInteger(0);
   private final AtomicInteger finishCrossCompactionTaskNum = new 
AtomicInteger(0);
+  // compaction type -> Counter[ Not-Aligned, Aligned]
+  private final Map<String, Counter[]> writeCounters = new 
ConcurrentHashMap<>();
+  private final Map<String, Counter[]> readCounters = new 
ConcurrentHashMap<>();
 
   private CompactionMetrics() {
     for (String type : TYPES) {
-      Map<CompactionType, Map<ProcessChunkType, Counter>> 
compactionTypeProcessChunkTypeMap =
-          writeInfoCounterMap.computeIfAbsent(type, k -> new 
ConcurrentHashMap<>());
-      for (CompactionType compactionType : CompactionType.values()) {
-        Map<ProcessChunkType, Counter> counterMap =
-            compactionTypeProcessChunkTypeMap.computeIfAbsent(
-                compactionType, k -> new ConcurrentHashMap<>());
-        for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
-          counterMap.put(processChunkType, 
DoNothingMetricManager.DO_NOTHING_COUNTER);
-        }
-      }
+      readCounters.put(
+          type,
+          new Counter[] {
+            DoNothingMetricManager.DO_NOTHING_COUNTER, 
DoNothingMetricManager.DO_NOTHING_COUNTER
+          });
+      writeCounters.put(
+          type,
+          new Counter[] {
+            DoNothingMetricManager.DO_NOTHING_COUNTER, 
DoNothingMetricManager.DO_NOTHING_COUNTER
+          });
     }
   }
 
@@ -79,25 +82,25 @@ public class CompactionMetrics implements IMetricSet {
   private Counter totalCompactionWriteInfoCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
 
   private void bindWriteInfo(AbstractMetricService metricService) {
-    for (String type : TYPES) {
-      Map<CompactionType, Map<ProcessChunkType, Counter>> 
compactionTypeProcessChunkTypeMap =
-          writeInfoCounterMap.computeIfAbsent(type, k -> new 
ConcurrentHashMap<>());
-      for (CompactionType compactionType : CompactionType.values()) {
-        Map<ProcessChunkType, Counter> counterMap =
-            compactionTypeProcessChunkTypeMap.computeIfAbsent(
-                compactionType, k -> new ConcurrentHashMap<>());
-        for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
-          Counter counter =
-              metricService.getOrCreateCounter(
-                  Metric.DATA_WRITTEN.toString(),
-                  MetricLevel.IMPORTANT,
-                  Tag.NAME.toString(),
-                  "compaction_" + compactionType.toString(),
-                  Tag.STATUS.toString(),
-                  type + "_" + processChunkType.toString());
-          counterMap.put(processChunkType, counter);
-        }
-      }
+    for (CompactionType compactionType : CompactionType.values()) {
+      writeCounters.put(
+          compactionType.toString(),
+          new Counter[] {
+            metricService.getOrCreateCounter(
+                Metric.DATA_WRITTEN.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                "not_aligned"),
+            metricService.getOrCreateCounter(
+                Metric.DATA_WRITTEN.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                "aligned")
+          });
     }
     totalCompactionWriteInfoCounter =
         metricService.getOrCreateCounter(
@@ -110,22 +113,21 @@ public class CompactionMetrics implements IMetricSet {
   }
 
   private void unbindWriteInfo(AbstractMetricService metricService) {
-    for (String type : TYPES) {
-      for (CompactionType compactionType : CompactionType.values()) {
-        for (ProcessChunkType processChunkType : ProcessChunkType.values()) {
-          metricService.remove(
-              MetricType.COUNTER,
-              Metric.DATA_WRITTEN.toString(),
-              Tag.NAME.toString(),
-              "compaction_" + compactionType.toString(),
-              Tag.STATUS.toString(),
-              type + "_" + processChunkType.toString());
-          writeInfoCounterMap
-              .get(type)
-              .get(compactionType)
-              .put(processChunkType, 
DoNothingMetricManager.DO_NOTHING_COUNTER);
-        }
-      }
+    for (CompactionType compactionType : CompactionType.values()) {
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_WRITTEN.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          "not_aligned");
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_WRITTEN.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          "aligned");
     }
     metricService.remove(
         MetricType.COUNTER,
@@ -136,14 +138,10 @@ public class CompactionMetrics implements IMetricSet {
         "total");
   }
 
-  public void recordWriteInfo(
-      CompactionType compactionType,
-      ProcessChunkType processChunkType,
-      boolean aligned,
-      long byteNum) {
-    String type = aligned ? "aligned" : "not_aligned";
-    
writeInfoCounterMap.get(type).get(compactionType).get(processChunkType).inc(byteNum
 / 1024L);
-    totalCompactionWriteInfoCounter.inc(byteNum / 1024L);
+  public void recordWriteInfo(CompactionType compactionType, boolean aligned, 
long byteNum) {
+    Counter[] counters = writeCounters.get(compactionType.toString());
+    counters[aligned ? 1 : 0].inc(byteNum);
+    totalCompactionWriteInfoCounter.inc(byteNum);
   }
 
   // endregion
@@ -152,18 +150,56 @@ public class CompactionMetrics implements IMetricSet {
   private Counter totalCompactionReadInfoCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
 
   private void bindReadInfo(AbstractMetricService metricService) {
+    for (CompactionType compactionType : CompactionType.values()) {
+      readCounters.put(
+          compactionType.toString(),
+          new Counter[] {
+            metricService.getOrCreateCounter(
+                Metric.DATA_READ.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                "not_aligned"),
+            metricService.getOrCreateCounter(
+                Metric.DATA_READ.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                compactionType.toString(),
+                Tag.NAME.toString(),
+                "aligned")
+          });
+    }
     totalCompactionReadInfoCounter =
         metricService.getOrCreateCounter(
             Metric.DATA_READ.toString(), MetricLevel.IMPORTANT, 
Tag.NAME.toString(), "compaction");
   }
 
   private void unbindReadInfo(AbstractMetricService metricService) {
+    for (CompactionType compactionType : CompactionType.values()) {
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_READ.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          "not_aligned");
+      metricService.remove(
+          MetricType.COUNTER,
+          Metric.DATA_READ.toString(),
+          Tag.TYPE.toString(),
+          compactionType.toString(),
+          Tag.NAME.toString(),
+          "aligned");
+    }
     metricService.remove(
         MetricType.COUNTER, Metric.DATA_READ.toString(), Tag.NAME.toString(), 
"compaction");
   }
 
-  public void recordReadInfo(long byteNum) {
-    totalCompactionReadInfoCounter.inc(byteNum / 1024L);
+  public void recordReadInfo(CompactionType compactionType, boolean aligned, 
long byteNum) {
+    Counter[] counters = readCounters.get(compactionType.toString());
+    counters[aligned ? 1 : 0].inc(byteNum);
+    totalCompactionReadInfoCounter.inc(byteNum);
   }
   // endregion
 

Reply via email to