This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch refactor-compaction-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 55bcfeb439f09f99b69d66d9686c8de7d57ac873
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jun 6 22:49:53 2023 +0800

    refactor the read process
---
 .../impl/ReadChunkCompactionPerformer.java         |  6 ----
 .../execute/utils/MultiTsFileDeviceIterator.java   | 13 ++++++--
 .../utils/writer/AbstractCompactionWriter.java     |  9 ++++--
 .../writer/AbstractCrossCompactionWriter.java      |  1 -
 .../writer/AbstractInnerCompactionWriter.java      |  1 -
 .../compaction/io/CompactionTsFileReader.java      | 36 +++++++++++++++++++++-
 .../compaction/io/CompactionTsFileWriter.java      |  3 +-
 7 files changed, 53 insertions(+), 16 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index fff1aec7e6e..27d4d48afc7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.engine.compaction.execute.performer.impl;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -37,16 +36,11 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
 public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private TsFileResource targetResource;
   private List<TsFileResource> seqFiles;
   private CompactionTaskSummary summary;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 7e2484e0d71..0cb49de129e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -70,7 +72,9 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
         this.tsFileResourcesSortedByDesc, 
TsFileResource::compareFileCreationOrderByDesc);
     try {
       for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) {
-        TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFileResource.getTsFilePath());
+        CompactionTsFileReader reader =
+            new CompactionTsFileReader(
+                tsFileResource.getTsFilePath(), 
CompactionType.INNER_SEQ_COMPACTION);
         readerMap.put(tsFileResource, reader);
         deviceIteratorMap.put(tsFileResource, 
reader.getAllDevicesIteratorWithIsAligned());
       }
@@ -112,8 +116,13 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
     Collections.sort(
         this.tsFileResourcesSortedByDesc, 
TsFileResource::compareFileCreationOrderByDesc);
     this.readerMap = readerMap;
+    CompactionType type =
+        seqResources.size() > 0 && unseqResources.size() > 0
+            ? CompactionType.CROSS_COMPACTION
+            : CompactionType.INNER_UNSEQ_COMPACTION;
     for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
-      TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFileResource.getTsFilePath());
+      TsFileSequenceReader reader =
+          new CompactionTsFileReader(tsFileResource.getTsFilePath(), type);
       readerMap.put(tsFileResource, reader);
       deviceIteratorMap.put(tsFileResource, 
reader.getAllDevicesIteratorWithIsAligned());
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 6297b3b8ef5..95e1f3a7e4c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import com.google.common.util.concurrent.RateLimiter;
 
@@ -167,7 +166,8 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     }
   }
 
-  protected void sealChunk(CompactionTsFileWriter targetWriter, IChunkWriter 
iChunkWriter, int subTaskId)
+  protected void sealChunk(
+      CompactionTsFileWriter targetWriter, IChunkWriter iChunkWriter, int 
subTaskId)
       throws IOException {
     CompactionTaskManager.mergeRateLimiterAcquire(
         compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
@@ -293,7 +293,10 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   }
 
   protected void checkChunkSizeAndMayOpenANewChunk(
-      CompactionTsFileWriter fileWriter, IChunkWriter iChunkWriter, int 
subTaskId, boolean isCrossSpace)
+      CompactionTsFileWriter fileWriter,
+      IChunkWriter iChunkWriter,
+      int subTaskId,
+      boolean isCrossSpace)
       throws IOException {
     if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) {
       // if chunk point num reaches the check point, then check if the chunk 
size over threshold
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index 975377264e8..334d994eb95 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 import java.util.ArrayList;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index b22d8d8689e..cd764f82452 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
index 7c9e06789d3..fcc752108da 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
@@ -19,12 +19,46 @@
 
 package org.apache.iotdb.db.engine.compaction.io;
 
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public class CompactionTsFileReader extends TsFileSequenceReader {
-  public CompactionTsFileReader(String file) throws IOException {
+  long readDataSize = 0L;
+  CompactionType compactionType;
+
+  public CompactionTsFileReader(String file, CompactionType compactionType) 
throws IOException {
     super(file);
+    this.compactionType = compactionType;
+  }
+
+  @Override
+  protected ByteBuffer readData(long position, int totalSize) throws 
IOException {
+    ByteBuffer buffer = super.readData(position, totalSize);
+    readDataSize += totalSize;
+    return buffer;
+  }
+
+  @Override
+  public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
+    long before = readDataSize;
+    Chunk chunk = super.readMemChunk(metaData);
+    long dataSize = readDataSize - before;
+    // TODO: record metrics size
+    return chunk;
+  }
+
+  @Override
+  public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws 
IOException {
+    long before = readDataSize;
+    TsFileDeviceIterator iterator = super.getAllDevicesIteratorWithIsAligned();
+    long dataSize = readDataSize - before;
+    // TODO: record metrics size
+    return iterator;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
index f23adc332d9..a74405ee839 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
@@ -71,8 +71,7 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
     super.writeEmptyValueChunk(
         measurementId, compressionType, tsDataType, encodingType, statistics);
     long writtenDataSize = this.getPos() - beforeOffset;
-    CompactionMetrics.getInstance()
-        .recordWriteInfo(type, WrittenDataType.ALIGNED, writtenDataSize);
+    CompactionMetrics.getInstance().recordWriteInfo(type, 
WrittenDataType.ALIGNED, writtenDataSize);
   }
 
   public void writeChunk(IChunkWriter chunkWriter) throws IOException {

Reply via email to