This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d90088336d3 Read of .meta file is not included in the compaction io 
metrics (#14772)
d90088336d3 is described below

commit d90088336d33f0e4cff4bb964eef9974c1333b10
Author: shuwenwei <[email protected]>
AuthorDate: Sat Feb 8 10:35:09 2025 +0800

    Read of .meta file is not included in the compaction io metrics (#14772)
    
    * Read of .meta file is not included in the compaction io metrics
    
    * spotless
---
 .../compaction/io/CompactionDiskTSMIterator.java   |  43 ++++++++
 .../compaction/io/CompactionTsFileInput.java       | 122 ++++++++++++++++++++-
 .../compaction/io/CompactionTsFileReader.java      |  60 +---------
 .../compaction/io/CompactionTsFileWriter.java      |   9 ++
 4 files changed, 178 insertions(+), 56 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java
new file mode 100644
index 00000000000..b90769dba37
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionDiskTSMIterator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
+
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+
+import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.tsfile.write.writer.tsmiterator.DiskTSMIterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+public class CompactionDiskTSMIterator extends DiskTSMIterator {
+
+  public CompactionDiskTSMIterator(
+      CompactionType compactionType,
+      File cmtFile,
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
+      LinkedList<Long> endPosForEachDevice)
+      throws IOException {
+    super(cmtFile, chunkGroupMetadataList, endPosForEachDevice);
+    this.input = new CompactionTsFileInput(compactionType, this.input);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
index b13475e4fe6..e51f26da14c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
 
+import org.apache.iotdb.db.service.metrics.CompactionMetrics;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+
 import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
 import org.apache.tsfile.read.reader.TsFileInput;
 
@@ -29,10 +34,33 @@ import java.nio.ByteBuffer;
 public class CompactionTsFileInput implements TsFileInput {
   private final TsFileInput tsFileInput;
 
-  public CompactionTsFileInput(TsFileInput tsFileInput) {
+  private long metadataOffset = -1;
+
+  /** The type of compaction running. */
+  private final CompactionType compactionType;
+
+  /** A flag that indicates if an aligned series is being read. */
+  private volatile boolean readingAlignedSeries = false;
+
+  public CompactionTsFileInput(CompactionType compactionType, TsFileInput 
tsFileInput) {
+    this.compactionType = compactionType;
     this.tsFileInput = tsFileInput;
   }
 
+  public void setMetadataOffset(long metadataOffset) {
+    this.metadataOffset = metadataOffset;
+  }
+
+  /** Marks the start of reading an aligned series. */
+  public void markStartOfAlignedSeries() {
+    readingAlignedSeries = true;
+  }
+
+  /** Marks the end of reading an aligned series. */
+  public void markEndOfAlignedSeries() {
+    readingAlignedSeries = false;
+  }
+
   @Override
   public long size() throws IOException {
     try {
@@ -71,7 +99,9 @@ public class CompactionTsFileInput implements TsFileInput {
 
   @Override
   public int read(ByteBuffer dst) throws IOException {
+    acquireReadDataSizeWithCompactionReadRateLimiter(dst.remaining());
     int readSize = tsFileInput.read(dst);
+    updateMetrics(position(), readSize);
     if (Thread.currentThread().isInterrupted()) {
       throw new StopReadTsFileByInterruptException();
     }
@@ -80,7 +110,9 @@ public class CompactionTsFileInput implements TsFileInput {
 
   @Override
   public int read(ByteBuffer dst, long position) throws IOException {
+    acquireReadDataSizeWithCompactionReadRateLimiter(dst.remaining());
     int readSize = tsFileInput.read(dst, position);
+    updateMetrics(position, readSize);
     if (Thread.currentThread().isInterrupted()) {
       throw new StopReadTsFileByInterruptException();
     }
@@ -89,7 +121,7 @@ public class CompactionTsFileInput implements TsFileInput {
 
   @Override
   public InputStream wrapAsInputStream() throws IOException {
-    return tsFileInput.wrapAsInputStream();
+    return new 
CompactionTsFileInputStreamWrapper(tsFileInput.wrapAsInputStream());
   }
 
   @Override
@@ -101,4 +133,90 @@ public class CompactionTsFileInput implements TsFileInput {
   public String getFilePath() {
     return tsFileInput.getFilePath();
   }
+
+  private void acquireReadDataSizeWithCompactionReadRateLimiter(int 
readDataSize) {
+    
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
+    
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
+  }
+
+  private void updateMetrics(long position, long totalSize) {
+    if (position >= metadataOffset) {
+      CompactionMetrics.getInstance()
+          .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
totalSize);
+    } else {
+      CompactionMetrics.getInstance()
+          .recordReadInfo(
+              compactionType,
+              readingAlignedSeries
+                  ? CompactionIoDataType.ALIGNED
+                  : CompactionIoDataType.NOT_ALIGNED,
+              totalSize);
+    }
+  }
+
+  private class CompactionTsFileInputStreamWrapper extends InputStream {
+
+    private final InputStream inputStream;
+
+    public CompactionTsFileInputStreamWrapper(InputStream inputStream) {
+      this.inputStream = inputStream;
+    }
+
+    @Override
+    public int read() throws IOException {
+      acquireReadDataSizeWithCompactionReadRateLimiter(1);
+      long position = position();
+      int readSize = inputStream.read();
+      updateMetrics(position, readSize);
+      return readSize;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      acquireReadDataSizeWithCompactionReadRateLimiter(b.length);
+      long position = position();
+      int readSize = inputStream.read(b);
+      updateMetrics(position, readSize);
+      return readSize;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      acquireReadDataSizeWithCompactionReadRateLimiter(len);
+      long position = position();
+      int readSize = inputStream.read(b, off, len);
+      updateMetrics(position, readSize);
+      return readSize;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      return inputStream.skip(n);
+    }
+
+    @Override
+    public int available() throws IOException {
+      return inputStream.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+      inputStream.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+      inputStream.mark(readlimit);
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+      inputStream.reset();
+    }
+
+    @Override
+    public boolean markSupported() {
+      return inputStream.markSupported();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index bf98611db2c..7aabb085643 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -19,29 +19,22 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.compaction.io;
 
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
 
 import org.apache.tsfile.file.IMetadataIndexEntry;
 import org.apache.tsfile.file.header.ChunkHeader;
-import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.utils.Pair;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.LongConsumer;
 
 /**
  * This class extends the TsFileSequenceReader class to read and manage TsFile 
with a focus on
@@ -50,14 +43,6 @@ import java.util.function.LongConsumer;
  */
 public class CompactionTsFileReader extends TsFileSequenceReader {
 
-  private long metadataOffset = 0;
-
-  /** The type of compaction running. */
-  CompactionType compactionType;
-
-  /** A flag that indicates if an aligned series is being read. */
-  private volatile boolean readingAlignedSeries = false;
-
   /**
    * Constructs a new instance of CompactionTsFileReader.
    *
@@ -67,44 +52,20 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
    */
   public CompactionTsFileReader(String file, CompactionType compactionType) 
throws IOException {
     super(file);
-    this.tsFileInput = new CompactionTsFileInput(tsFileInput);
-    this.compactionType = compactionType;
-    this.metadataOffset = readFileMetadata().getMetaOffset();
-  }
-
-  @Override
-  protected ByteBuffer readData(long position, int totalSize, LongConsumer 
ioSizeRecorder)
-      throws IOException {
-    acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
-    ByteBuffer buffer = super.readData(position, totalSize, ioSizeRecorder);
-    if (position >= metadataOffset) {
-      CompactionMetrics.getInstance()
-          .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
totalSize);
-    } else {
-      CompactionMetrics.getInstance()
-          .recordReadInfo(
-              compactionType,
-              readingAlignedSeries
-                  ? CompactionIoDataType.ALIGNED
-                  : CompactionIoDataType.NOT_ALIGNED,
-              totalSize);
-    }
-    return buffer;
+    CompactionTsFileInput compactionTsFileInput =
+        new CompactionTsFileInput(compactionType, tsFileInput);
+    this.tsFileInput = compactionTsFileInput;
+    
compactionTsFileInput.setMetadataOffset(readFileMetadata().getMetaOffset());
   }
 
   /** Marks the start of reading an aligned series. */
   public void markStartOfAlignedSeries() {
-    readingAlignedSeries = true;
+    ((CompactionTsFileInput) tsFileInput).markStartOfAlignedSeries();
   }
 
   /** Marks the end of reading an aligned series. */
   public void markEndOfAlignedSeries() {
-    readingAlignedSeries = false;
-  }
-
-  @Override
-  public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
-    return super.readMemChunk(metaData);
+    ((CompactionTsFileInput) tsFileInput).markEndOfAlignedSeries();
   }
 
   @SuppressWarnings("java:S2177")
@@ -112,10 +73,6 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
     return ChunkHeader.deserializeFrom(tsFileInput, position);
   }
 
-  public InputStream wrapAsInputStream() throws IOException {
-    return this.tsFileInput.wrapAsInputStream();
-  }
-
   public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
       getTimeseriesMetadataAndOffsetByDevice(
           MetadataIndexNode measurementNode,
@@ -186,11 +143,6 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
     return timeseriesMetadataOffsetMap;
   }
 
-  private void acquireReadDataSizeWithCompactionReadRateLimiter(int 
readDataSize) {
-    
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
-    
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
-  }
-
   @Override
   public boolean equals(Object o) {
     return super.equals(o);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
index 7d89747d2f6..fcea93d6c2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
@@ -36,6 +36,7 @@ import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.chunk.IChunkWriter;
 import org.apache.tsfile.write.record.Tablet.ColumnCategory;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.apache.tsfile.write.writer.tsmiterator.TSMIterator;
 
 import java.io.File;
 import java.io.IOException;
@@ -150,6 +151,14 @@ public class CompactionTsFileWriter extends TsFileIOWriter 
{
         .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
   }
 
+  @Override
+  protected TSMIterator getTSMIterator() throws IOException {
+    return hasChunkMetadataInDisk
+        ? new CompactionDiskTSMIterator(
+            type, chunkMetadataTempFile, chunkGroupMetadataList, 
endPosInCMTForDevice)
+        : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
+  }
+
   public boolean isEmptyTargetFile() {
     return isEmptyTargetFile;
   }

Reply via email to