This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b318c3fa74 Add wal compression metric items (#13105)
8b318c3fa74 is described below

commit 8b318c3fa745ff4f0a8b6d21477af3e27b049b13
Author: shuwenwei <[email protected]>
AuthorDate: Wed Aug 7 18:43:56 2024 +0800

    Add wal compression metric items (#13105)
    
    * add wal compression metrics
    
    * fix code refered in review comments
---
 .../iotdb/db/service/metrics/WritingMetrics.java   | 88 +++++++++++++++++++++-
 .../iotdb/db/storageengine/StorageEngine.java      |  4 +
 .../storageengine/dataregion/wal/io/LogWriter.java |  7 ++
 .../dataregion/wal/io/WALInputStream.java          | 36 +++++++--
 4 files changed, 125 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 183d9a9f8c2..e7194126c1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -174,10 +174,24 @@ public class WritingMetrics implements IMetricSet {
   // region wal overview metrics
   public static final String WAL_NODES_NUM = "wal_nodes_num";
   public static final String USED_RATIO = "used_ratio";
+  public static final String SERIALIZED_WAL_BUFFER_SIZE_BYTE = 
"serialized_wal_buffer_size";
+  public static final String WROTE_WAL_BUFFER_SIZE_BYTE = 
"wrote_wal_buffer_size";
+  public static final String WAL_COMPRESS_COST_NS = "wal_compress_cost";
+  public static final String WAL_UNCOMPRESS_COST_NS = "wal_uncompress_cost";
+  public static final String READ_WAL_BUFFER_SIZE_BYTE = 
"read_wal_buffer_size";
+  public static final String READ_WAL_BUFFER_COST_NS = "read_wal_buffer_cost";
+  public static final String WRITE_WAL_BUFFER_COST_NS = 
"write_wal_buffer_cost";
   public static final String ENTRIES_COUNT = "entries_count";
 
   private Histogram usedRatioHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
   private Histogram entriesCountHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram serializedWALBufferSizeHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram wroteWALBufferSizeHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram walCompressCostHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram walUncompressCostHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram readWALBufferSizeHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram readWALBufferCostHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram writeWALBufferCostHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
 
   private void bindWALMetrics(AbstractMetricService metricService) {
     metricService.createAutoGauge(
@@ -196,6 +210,49 @@ public class WritingMetrics implements IMetricSet {
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
             ENTRIES_COUNT);
+
+    serializedWALBufferSizeHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.WAL_BUFFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            SERIALIZED_WAL_BUFFER_SIZE_BYTE);
+    wroteWALBufferSizeHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.WAL_BUFFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            WROTE_WAL_BUFFER_SIZE_BYTE);
+    walCompressCostHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.WAL_BUFFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            WAL_COMPRESS_COST_NS);
+    walUncompressCostHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.WAL_BUFFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            WAL_UNCOMPRESS_COST_NS);
+    readWALBufferSizeHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.WAL_BUFFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            READ_WAL_BUFFER_SIZE_BYTE);
+    readWALBufferCostHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.WAL_BUFFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            READ_WAL_BUFFER_COST_NS);
+    writeWALBufferCostHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.WAL_BUFFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            WRITE_WAL_BUFFER_COST_NS);
   }
 
   private void unbindWALMetrics(AbstractMetricService metricService) {
@@ -203,7 +260,16 @@ public class WritingMetrics implements IMetricSet {
         MetricType.AUTO_GAUGE, Metric.WAL_NODE_NUM.toString(), 
Tag.NAME.toString(), WAL_NODES_NUM);
     usedRatioHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
     entriesCountHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
-    Arrays.asList(USED_RATIO, ENTRIES_COUNT)
+    Arrays.asList(
+            USED_RATIO,
+            ENTRIES_COUNT,
+            SERIALIZED_WAL_BUFFER_SIZE_BYTE,
+            WROTE_WAL_BUFFER_SIZE_BYTE,
+            WAL_COMPRESS_COST_NS,
+            WAL_UNCOMPRESS_COST_NS,
+            READ_WAL_BUFFER_SIZE_BYTE,
+            READ_WAL_BUFFER_COST_NS,
+            WRITE_WAL_BUFFER_COST_NS)
         .forEach(
             name ->
                 metricService.remove(
@@ -215,7 +281,6 @@ public class WritingMetrics implements IMetricSet {
   // region wal cost metrics
   public static final String MAKE_CHECKPOINT = "make_checkpoint";
   public static final String SERIALIZE_WAL_ENTRY = "serialize_wal_entry";
-  public static final String SERIALIZE_ONE_WAL_INFO_ENTRY = 
"serialize_one_wal_info_entry";
   public static final String SERIALIZE_WAL_ENTRY_TOTAL = 
"serialize_wal_entry_total";
   public static final String SYNC_WAL_BUFFER = "sync_wal_buffer";
   public static final String SYNC = "sync";
@@ -807,6 +872,25 @@ public class WritingMetrics implements IMetricSet {
     serializeWalEntryTotalTimer.updateNanos(costTimeInNanos);
   }
 
+  public void recordCompressWALBufferCost(long costTimeInNanos) {
+    walCompressCostHistogram.update(costTimeInNanos);
+  }
+
+  public void recordWroteWALBuffer(int serializedSize, int wroteSize, long 
wroteTimeCostInNanos) {
+    serializedWALBufferSizeHistogram.update(serializedSize);
+    wroteWALBufferSizeHistogram.update(wroteSize);
+    writeWALBufferCostHistogram.update(wroteTimeCostInNanos);
+  }
+
+  public void recordWALUncompressCost(long costTimeInNanos) {
+    walUncompressCostHistogram.update(costTimeInNanos);
+  }
+
+  public void recordWALRead(long size, long costTimeInNanos) {
+    readWALBufferSizeHistogram.update(size);
+    readWALBufferCostHistogram.update(costTimeInNanos);
+  }
+
   public void recordSyncWALBufferCost(long costTimeInNanos, boolean forceFlag) 
{
     if (forceFlag) {
       // fsync mode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index c742914f989..334f5f4d357 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -187,6 +187,7 @@ public class StorageEngine implements IService {
   }
 
   public void asyncRecover() throws StartupException {
+    long startRecoverTime = System.currentTimeMillis();
     setAllSgReady(false);
     cachedThreadPool =
         
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
@@ -210,6 +211,9 @@ public class StorageEngine implements IService {
               checkResults(futures, "StorageEngine failed to recover.");
               recoverRepairData();
               setAllSgReady(true);
+              LOGGER.info(
+                  "Storage Engine recover cost: {}s.",
+                  (System.currentTimeMillis() - startRecoverTime) / 1000);
             },
             ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
     recoverEndTrigger.start();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 278fe93cae0..cdec209e507 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
 
@@ -75,6 +76,7 @@ public abstract class LogWriter implements ILogWriter {
 
   @Override
   public double write(ByteBuffer buffer) throws IOException {
+    long startTime = System.nanoTime();
     // To support hot loading, we can't define it as a variable,
     // because we need to dynamically check whether wal compression is enabled
     // each time the buffer is serialized
@@ -112,7 +114,9 @@ public abstract class LogWriter implements ILogWriter {
     headerBuffer.putInt(bufferSize);
     if (compressed) {
       headerBuffer.putInt(uncompressedSize);
+      
WritingMetrics.getInstance().recordCompressWALBufferCost(System.nanoTime() - 
startTime);
     }
+    startTime = System.nanoTime();
     try {
       headerBuffer.flip();
       logChannel.write(headerBuffer);
@@ -120,6 +124,9 @@ public abstract class LogWriter implements ILogWriter {
     } catch (ClosedChannelException e) {
       logger.warn("Cannot write to {}", logFile, e);
     }
+    WritingMetrics.getInstance()
+        .recordWroteWALBuffer(uncompressedSize, bufferSize, System.nanoTime() 
- startTime);
+
     return ((double) bufferSize / uncompressedSize);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index eff873510fd..19e1564f946 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.utils.MmapUtil;
 
 import org.apache.tsfile.compress.IUnCompressor;
@@ -188,6 +189,8 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     if (channel.position() >= endOffset) {
       throw new IOException("Reach the end offset of wal file");
     }
+    long startTime = System.nanoTime();
+    long startPosition = channel.position();
     if (version == WALFileVersion.V2) {
       loadNextSegmentV2();
     } else if (version == WALFileVersion.V1) {
@@ -195,6 +198,8 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     } else {
       tryLoadSegment();
     }
+    WritingMetrics.getInstance()
+        .recordWALRead(channel.position() - startPosition, System.nanoTime() - 
startTime);
   }
 
   private void loadNextSegmentV1() throws IOException {
@@ -207,7 +212,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       dataBuffer = ByteBuffer.allocate(128 * 1024);
     }
     dataBuffer.clear();
-    channel.read(dataBuffer);
+    readWALBufferFromChannel(dataBuffer);
     dataBuffer.flip();
   }
 
@@ -232,12 +237,12 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
       compressedBuffer.clear();
       // limit the buffer to prevent it from reading too much byte than 
expected
       compressedBuffer.limit(segmentInfo.dataInDiskSize);
-      if (channel.read(compressedBuffer) != segmentInfo.dataInDiskSize) {
+      if (readWALBufferFromChannel(compressedBuffer) != 
segmentInfo.dataInDiskSize) {
         throw new IOException("Unexpected end of file");
       }
       compressedBuffer.flip();
       IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
-      unCompressor.uncompress(compressedBuffer, dataBuffer);
+      uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
     } else {
       // An uncompressed segment
       if (Objects.isNull(dataBuffer)
@@ -250,7 +255,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       // limit the buffer to prevent it from reading too much byte than 
expected
       dataBuffer.limit(segmentInfo.dataInDiskSize);
 
-      if (channel.read(dataBuffer) != segmentInfo.dataInDiskSize) {
+      if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) {
         throw new IOException("Unexpected end of file");
       }
     }
@@ -296,15 +301,15 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
 
       if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
         compressedBuffer = 
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
-        channel.read(compressedBuffer);
+        readWALBufferFromChannel(compressedBuffer);
         compressedBuffer.flip();
         IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
         dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
-        unCompressor.uncompress(compressedBuffer, dataBuffer);
+        uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
         MmapUtil.clean(compressedBuffer);
       } else {
         dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
-        channel.read(dataBuffer);
+        readWALBufferFromChannel(dataBuffer);
         dataBuffer.flip();
       }
 
@@ -351,7 +356,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt();
     if (info.compressionType != CompressionType.UNCOMPRESSED) {
       compressedSizeBuffer.clear();
-      channel.read(compressedSizeBuffer);
+      readWALBufferFromChannel(compressedSizeBuffer);
       compressedSizeBuffer.flip();
       info.uncompressedSize = compressedSizeBuffer.getInt();
     } else {
@@ -360,6 +365,21 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     return info;
   }
 
+  private int readWALBufferFromChannel(ByteBuffer buffer) throws IOException {
+    long startTime = System.nanoTime();
+    int size = channel.read(buffer);
+    WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() - 
startTime);
+    return size;
+  }
+
+  private void uncompressWALBuffer(
+      ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor 
unCompressor)
+      throws IOException {
+    long startTime = System.nanoTime();
+    unCompressor.uncompress(compressed, uncompressed);
+    WritingMetrics.getInstance().recordWALUncompressCost(System.nanoTime() - 
startTime);
+  }
+
   private static class SegmentInfo {
     public CompressionType compressionType;
     public int dataInDiskSize;

Reply via email to