This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch refactor-compaction-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b07a9b7e47e77297d54c8549b4ec662d7f817e84
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jun 6 16:43:17 2023 +0800

    refactor the write process of fast compaction
---
 .../utils/writer/AbstractCompactionWriter.java      | 15 ++++++++-------
 .../utils/writer/AbstractCrossCompactionWriter.java | 19 ++++++++++++-------
 .../utils/writer/AbstractInnerCompactionWriter.java | 10 ++++++++--
 .../compaction/io/CompactionTsFileWriter.java       | 21 +++++++++++++++++++++
 4 files changed, 49 insertions(+), 16 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 9a26c4de327..6297b3b8ef5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType;
@@ -166,12 +167,12 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     }
   }
 
-  protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter 
iChunkWriter, int subTaskId)
+  protected void sealChunk(CompactionTsFileWriter targetWriter, IChunkWriter 
iChunkWriter, int subTaskId)
       throws IOException {
     CompactionTaskManager.mergeRateLimiterAcquire(
         compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
     synchronized (targetWriter) {
-      iChunkWriter.writeToFileWriter(targetWriter);
+      targetWriter.writeChunk(iChunkWriter);
     }
     chunkPointNumArray[subTaskId] = 0;
   }
@@ -188,19 +189,19 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
       throws IOException;
 
   protected void flushNonAlignedChunkToFileWriter(
-      TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, 
int subTaskId)
+      CompactionTsFileWriter targetWriter, Chunk chunk, ChunkMetadata 
chunkMetadata, int subTaskId)
       throws IOException {
     CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(chunk));
     synchronized (targetWriter) {
       // seal last chunk to file writer
-      chunkWriters[subTaskId].writeToFileWriter(targetWriter);
+      targetWriter.writeChunk(chunkWriters[subTaskId]);
       chunkPointNumArray[subTaskId] = 0;
       targetWriter.writeChunk(chunk, chunkMetadata);
     }
   }
 
   protected void flushAlignedChunkToFileWriter(
-      TsFileIOWriter targetWriter,
+      CompactionTsFileWriter targetWriter,
       Chunk timeChunk,
       IChunkMetadata timeChunkMetadata,
       List<Chunk> valueChunks,
@@ -210,7 +211,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     synchronized (targetWriter) {
       AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriters[subTaskId];
       // seal last chunk to file writer
-      alignedChunkWriter.writeToFileWriter(targetWriter);
+      targetWriter.writeChunk(alignedChunkWriter);
       chunkPointNumArray[subTaskId] = 0;
 
       // flush time chunk
@@ -292,7 +293,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   }
 
   protected void checkChunkSizeAndMayOpenANewChunk(
-      TsFileIOWriter fileWriter, IChunkWriter iChunkWriter, int subTaskId, 
boolean isCrossSpace)
+      CompactionTsFileWriter fileWriter, IChunkWriter iChunkWriter, int 
subTaskId, boolean isCrossSpace)
       throws IOException {
     if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) {
       // if chunk point num reaches the check point, then check if the chunk 
size over threshold
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index 44409808daf..975377264e8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -21,6 +21,8 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -39,7 +41,7 @@ import java.util.Map;
 public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWriter {
 
   // target fileIOWriters
-  protected List<TsFileIOWriter> targetFileWriters = new ArrayList<>();
+  protected List<CompactionTsFileWriter> targetFileWriters = new ArrayList<>();
 
   // source tsfiles
   private List<TsFileResource> seqTsFileResources;
@@ -77,8 +79,11 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
     boolean enableMemoryControl = 
IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
     for (int i = 0; i < targetResources.size(); i++) {
       this.targetFileWriters.add(
-          new TsFileIOWriter(
-              targetResources.get(i).getTsFile(), enableMemoryControl, 
memorySizeForEachWriter));
+          new CompactionTsFileWriter(
+              targetResources.get(i).getTsFile(),
+              enableMemoryControl,
+              memorySizeForEachWriter,
+              CompactionType.CROSS_COMPACTION));
       isEmptyFile[i] = true;
     }
     this.seqTsFileResources = seqFileResources;
@@ -99,7 +104,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
   @Override
   public void endChunkGroup() throws IOException {
     for (int i = 0; i < seqTsFileResources.size(); i++) {
-      TsFileIOWriter targetFileWriter = targetFileWriters.get(i);
+      CompactionTsFileWriter targetFileWriter = targetFileWriters.get(i);
       if (isDeviceExistedInTargetFiles[i]) {
         // update resource
         CompactionUtils.updateResource(targetResources.get(i), 
targetFileWriter, deviceId);
@@ -153,7 +158,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
 
   @Override
   public void close() throws IOException {
-    for (TsFileIOWriter targetWriter : targetFileWriters) {
+    for (CompactionTsFileWriter targetWriter : targetFileWriters) {
       if (targetWriter != null && targetWriter.canWrite()) {
         targetWriter.close();
       }
@@ -165,7 +170,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
   @Override
   public void checkAndMayFlushChunkMetadata() throws IOException {
     for (int i = 0; i < targetFileWriters.size(); i++) {
-      TsFileIOWriter fileIOWriter = targetFileWriters.get(i);
+      CompactionTsFileWriter fileIOWriter = targetFileWriters.get(i);
       fileIOWriter.checkMetadataSizeAndMayFlush();
     }
   }
@@ -234,7 +239,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
   @Override
   public long getWriterSize() throws IOException {
     long totalSize = 0;
-    for (TsFileIOWriter writer : targetFileWriters) {
+    for (CompactionTsFileWriter writer : targetFileWriters) {
       totalSize += writer.getPos();
     }
     return totalSize;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index 6404730d0e4..b22d8d8689e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -20,6 +20,8 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
+import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -31,7 +33,7 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import java.io.IOException;
 
 public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWriter {
-  protected TsFileIOWriter fileWriter;
+  protected CompactionTsFileWriter fileWriter;
 
   protected boolean isEmptyFile;
 
@@ -50,7 +52,11 @@ public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWr
                 * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
     boolean enableMemoryControl = 
IoTDBDescriptor.getInstance().getConfig().isEnableMemControl();
     this.fileWriter =
-        new TsFileIOWriter(targetFileResource.getTsFile(), 
enableMemoryControl, sizeForFileWriter);
+        new CompactionTsFileWriter(
+            targetFileResource.getTsFile(),
+            enableMemoryControl,
+            sizeForFileWriter,
+            CompactionType.INNER_UNSEQ_COMPACTION);
     this.targetResource = targetFileResource;
     isEmptyFile = true;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
index 62f3b65a0a2..f23adc332d9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
@@ -24,6 +24,10 @@ import 
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -31,6 +35,7 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 
 public class CompactionTsFileWriter extends TsFileIOWriter {
   CompactionType type;
@@ -54,6 +59,22 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
         .recordWriteInfo(type, WrittenDataType.NOT_ALIGNED, writtenDataSize);
   }
 
+  @Override
+  public void writeEmptyValueChunk(
+      String measurementId,
+      CompressionType compressionType,
+      TSDataType tsDataType,
+      TSEncoding encodingType,
+      Statistics<? extends Serializable> statistics)
+      throws IOException {
+    long beforeOffset = this.getPos();
+    super.writeEmptyValueChunk(
+        measurementId, compressionType, tsDataType, encodingType, statistics);
+    long writtenDataSize = this.getPos() - beforeOffset;
+    CompactionMetrics.getInstance()
+        .recordWriteInfo(type, WrittenDataType.ALIGNED, writtenDataSize);
+  }
+
   public void writeChunk(IChunkWriter chunkWriter) throws IOException {
     boolean isAligned = chunkWriter instanceof AlignedChunkWriterImpl;
     long beforeOffset = this.getPos();

Reply via email to