This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d1a2122bd732d440680a0626afed1cc3c9845c7b Author: shuwenwei <[email protected]> AuthorDate: Wed Aug 7 18:43:56 2024 +0800 Add wal compression metric items (#13105) * add wal compression metrics * fix code refered in review comments (cherry picked from commit 8b318c3fa745ff4f0a8b6d21477af3e27b049b13) --- .../iotdb/db/service/metrics/WritingMetrics.java | 88 +++++++++++++++++++++- .../iotdb/db/storageengine/StorageEngine.java | 4 + .../storageengine/dataregion/wal/io/LogWriter.java | 7 ++ .../dataregion/wal/io/WALInputStream.java | 36 +++++++-- 4 files changed, 125 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java index 183d9a9f8c2..e7194126c1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java @@ -174,10 +174,24 @@ public class WritingMetrics implements IMetricSet { // region wal overview metrics public static final String WAL_NODES_NUM = "wal_nodes_num"; public static final String USED_RATIO = "used_ratio"; + public static final String SERIALIZED_WAL_BUFFER_SIZE_BYTE = "serialized_wal_buffer_size"; + public static final String WROTE_WAL_BUFFER_SIZE_BYTE = "wrote_wal_buffer_size"; + public static final String WAL_COMPRESS_COST_NS = "wal_compress_cost"; + public static final String WAL_UNCOMPRESS_COST_NS = "wal_uncompress_cost"; + public static final String READ_WAL_BUFFER_SIZE_BYTE = "read_wal_buffer_size"; + public static final String READ_WAL_BUFFER_COST_NS = "read_wal_buffer_cost"; + public static final String WRITE_WAL_BUFFER_COST_NS = "write_wal_buffer_cost"; public static final String ENTRIES_COUNT = "entries_count"; private Histogram usedRatioHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private Histogram entriesCountHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram serializedWALBufferSizeHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram wroteWALBufferSizeHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram walCompressCostHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram walUncompressCostHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram readWALBufferSizeHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram readWALBufferCostHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram writeWALBufferCostHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private void bindWALMetrics(AbstractMetricService metricService) { metricService.createAutoGauge( @@ -196,6 +210,49 @@ public class WritingMetrics implements IMetricSet { MetricLevel.IMPORTANT, Tag.NAME.toString(), ENTRIES_COUNT); + + serializedWALBufferSizeHistogram = + metricService.getOrCreateHistogram( + Metric.WAL_BUFFER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + SERIALIZED_WAL_BUFFER_SIZE_BYTE); + wroteWALBufferSizeHistogram = + metricService.getOrCreateHistogram( + Metric.WAL_BUFFER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + WROTE_WAL_BUFFER_SIZE_BYTE); + walCompressCostHistogram = + metricService.getOrCreateHistogram( + Metric.WAL_BUFFER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + WAL_COMPRESS_COST_NS); + walUncompressCostHistogram = + metricService.getOrCreateHistogram( + Metric.WAL_BUFFER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + WAL_UNCOMPRESS_COST_NS); + readWALBufferSizeHistogram = + metricService.getOrCreateHistogram( + Metric.WAL_BUFFER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + READ_WAL_BUFFER_SIZE_BYTE); + readWALBufferCostHistogram = + metricService.getOrCreateHistogram( + Metric.WAL_BUFFER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + READ_WAL_BUFFER_COST_NS); + writeWALBufferCostHistogram = + metricService.getOrCreateHistogram( + Metric.WAL_BUFFER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + WRITE_WAL_BUFFER_COST_NS); } private void unbindWALMetrics(AbstractMetricService metricService) { @@ -203,7 +260,16 @@ public class WritingMetrics implements IMetricSet { MetricType.AUTO_GAUGE, Metric.WAL_NODE_NUM.toString(), Tag.NAME.toString(), WAL_NODES_NUM); usedRatioHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; entriesCountHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; - Arrays.asList(USED_RATIO, ENTRIES_COUNT) + Arrays.asList( + USED_RATIO, + ENTRIES_COUNT, + SERIALIZED_WAL_BUFFER_SIZE_BYTE, + WROTE_WAL_BUFFER_SIZE_BYTE, + WAL_COMPRESS_COST_NS, + WAL_UNCOMPRESS_COST_NS, + READ_WAL_BUFFER_SIZE_BYTE, + READ_WAL_BUFFER_COST_NS, + WRITE_WAL_BUFFER_COST_NS) .forEach( name -> metricService.remove( @@ -215,7 +281,6 @@ public class WritingMetrics implements IMetricSet { // region wal cost metrics public static final String MAKE_CHECKPOINT = "make_checkpoint"; public static final String SERIALIZE_WAL_ENTRY = "serialize_wal_entry"; - public static final String SERIALIZE_ONE_WAL_INFO_ENTRY = "serialize_one_wal_info_entry"; public static final String SERIALIZE_WAL_ENTRY_TOTAL = "serialize_wal_entry_total"; public static final String SYNC_WAL_BUFFER = "sync_wal_buffer"; public static final String SYNC = "sync"; @@ -807,6 +872,25 @@ public class WritingMetrics implements IMetricSet { serializeWalEntryTotalTimer.updateNanos(costTimeInNanos); } + public void recordCompressWALBufferCost(long costTimeInNanos) { + walCompressCostHistogram.update(costTimeInNanos); + } + + public void recordWroteWALBuffer(int serializedSize, int wroteSize, long wroteTimeCostInNanos) { + serializedWALBufferSizeHistogram.update(serializedSize); + wroteWALBufferSizeHistogram.update(wroteSize); + writeWALBufferCostHistogram.update(wroteTimeCostInNanos); + } + + public void recordWALUncompressCost(long costTimeInNanos) { + walUncompressCostHistogram.update(costTimeInNanos); + } + + public void recordWALRead(long size, long costTimeInNanos) { + readWALBufferSizeHistogram.update(size); + readWALBufferCostHistogram.update(costTimeInNanos); + } + public void recordSyncWALBufferCost(long costTimeInNanos, boolean forceFlag) { if (forceFlag) { // fsync mode diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 23f046d7a15..89aa5e88096 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -187,6 +187,7 @@ public class StorageEngine implements IService { } public void asyncRecover() throws StartupException { + long startRecoverTime = System.currentTimeMillis(); setAllSgReady(false); cachedThreadPool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName()); @@ -210,6 +211,9 @@ public class StorageEngine implements IService { checkResults(futures, "StorageEngine failed to recover."); recoverRepairData(); setAllSgReady(true); + LOGGER.info( + "Storage Engine recover cost: {}s.", + (System.currentTimeMillis() - startRecoverTime) / 1000); }, ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName()); recoverEndTrigger.start(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java index 278fe93cae0..cdec209e507 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint; @@ -75,6 +76,7 @@ public abstract class LogWriter implements ILogWriter { @Override public double write(ByteBuffer buffer) throws IOException { + long startTime = System.nanoTime(); // To support hot loading, we can't define it as a variable, // because we need to dynamically check whether wal compression is enabled // each time the buffer is serialized @@ -112,7 +114,9 @@ public abstract class LogWriter implements ILogWriter { headerBuffer.putInt(bufferSize); if (compressed) { headerBuffer.putInt(uncompressedSize); + WritingMetrics.getInstance().recordCompressWALBufferCost(System.nanoTime() - startTime); } + startTime = System.nanoTime(); try { headerBuffer.flip(); logChannel.write(headerBuffer); @@ -120,6 +124,9 @@ public abstract class LogWriter implements ILogWriter { } catch (ClosedChannelException e) { logger.warn("Cannot write to {}", logFile, e); } + WritingMetrics.getInstance() + .recordWroteWALBuffer(uncompressedSize, bufferSize, System.nanoTime() - startTime); + return ((double) bufferSize / uncompressedSize); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index eff873510fd..19e1564f946 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.utils.MmapUtil; import org.apache.tsfile.compress.IUnCompressor; @@ -188,6 +189,8 @@ public class WALInputStream extends InputStream implements AutoCloseable { if (channel.position() >= endOffset) { throw new IOException("Reach the end offset of wal file"); } + long startTime = System.nanoTime(); + long startPosition = channel.position(); if (version == WALFileVersion.V2) { loadNextSegmentV2(); } else if (version == WALFileVersion.V1) { @@ -195,6 +198,8 @@ public class WALInputStream extends InputStream implements AutoCloseable { } else { tryLoadSegment(); } + WritingMetrics.getInstance() + .recordWALRead(channel.position() - startPosition, System.nanoTime() - startTime); } private void loadNextSegmentV1() throws IOException { @@ -207,7 +212,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { dataBuffer = ByteBuffer.allocate(128 * 1024); } dataBuffer.clear(); - channel.read(dataBuffer); + readWALBufferFromChannel(dataBuffer); dataBuffer.flip(); } @@ -232,12 +237,12 @@ public class WALInputStream extends InputStream implements AutoCloseable { compressedBuffer.clear(); // limit the buffer to prevent it from reading too much byte than expected compressedBuffer.limit(segmentInfo.dataInDiskSize); - if (channel.read(compressedBuffer) != segmentInfo.dataInDiskSize) { + if (readWALBufferFromChannel(compressedBuffer) != segmentInfo.dataInDiskSize) { throw new IOException("Unexpected end of file"); } compressedBuffer.flip(); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); - unCompressor.uncompress(compressedBuffer, dataBuffer); + uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor); } else { // An uncompressed segment if (Objects.isNull(dataBuffer) @@ -250,7 +255,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { // limit the buffer to prevent it from reading too much byte than expected dataBuffer.limit(segmentInfo.dataInDiskSize); - if (channel.read(dataBuffer) != segmentInfo.dataInDiskSize) { + if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) { throw new IOException("Unexpected end of file"); } } @@ -296,15 +301,15 @@ public class WALInputStream extends InputStream implements AutoCloseable { if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) { compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); - channel.read(compressedBuffer); + readWALBufferFromChannel(compressedBuffer); compressedBuffer.flip(); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); - unCompressor.uncompress(compressedBuffer, dataBuffer); + uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor); MmapUtil.clean(compressedBuffer); } else { dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); - channel.read(dataBuffer); + readWALBufferFromChannel(dataBuffer); dataBuffer.flip(); } @@ -351,7 +356,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt(); if (info.compressionType != CompressionType.UNCOMPRESSED) { compressedSizeBuffer.clear(); - channel.read(compressedSizeBuffer); + readWALBufferFromChannel(compressedSizeBuffer); compressedSizeBuffer.flip(); info.uncompressedSize = compressedSizeBuffer.getInt(); } else { @@ -360,6 +365,21 @@ public class WALInputStream extends InputStream implements AutoCloseable { return info; } + private int readWALBufferFromChannel(ByteBuffer buffer) throws IOException { + long startTime = System.nanoTime(); + int size = channel.read(buffer); + WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() - startTime); + return size; + } + + private void uncompressWALBuffer( + ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor unCompressor) + throws IOException { + long startTime = System.nanoTime(); + unCompressor.uncompress(compressed, uncompressed); + WritingMetrics.getInstance().recordWALUncompressCost(System.nanoTime() - startTime); + } + private static class SegmentInfo { public CompressionType compressionType; public int dataInDiskSize;
