This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch refactor-compaction-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b07a9b7e47e77297d54c8549b4ec662d7f817e84 Author: Liu Xuxin <[email protected]> AuthorDate: Tue Jun 6 16:43:17 2023 +0800 refactor the write process of fast compaction --- .../utils/writer/AbstractCompactionWriter.java | 15 ++++++++------- .../utils/writer/AbstractCrossCompactionWriter.java | 19 ++++++++++++------- .../utils/writer/AbstractInnerCompactionWriter.java | 10 ++++++++-- .../compaction/io/CompactionTsFileWriter.java | 21 +++++++++++++++++++++ 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java index 9a26c4de327..6297b3b8ef5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType; @@ -166,12 +167,12 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } } - protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId) + protected void sealChunk(CompactionTsFileWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId) throws IOException { CompactionTaskManager.mergeRateLimiterAcquire( compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize()); synchronized (targetWriter) { - iChunkWriter.writeToFileWriter(targetWriter); + targetWriter.writeChunk(iChunkWriter); } chunkPointNumArray[subTaskId] = 0; } @@ -188,19 +189,19 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { throws IOException; protected void flushNonAlignedChunkToFileWriter( - TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) + CompactionTsFileWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) throws IOException { CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); synchronized (targetWriter) { // seal last chunk to file writer - chunkWriters[subTaskId].writeToFileWriter(targetWriter); + targetWriter.writeChunk(chunkWriters[subTaskId]); chunkPointNumArray[subTaskId] = 0; targetWriter.writeChunk(chunk, chunkMetadata); } } protected void flushAlignedChunkToFileWriter( - TsFileIOWriter targetWriter, + CompactionTsFileWriter targetWriter, Chunk timeChunk, IChunkMetadata timeChunkMetadata, List<Chunk> valueChunks, @@ -210,7 +211,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { synchronized (targetWriter) { AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriters[subTaskId]; // seal last chunk to file writer - alignedChunkWriter.writeToFileWriter(targetWriter); + targetWriter.writeChunk(alignedChunkWriter); chunkPointNumArray[subTaskId] = 0; // flush time chunk @@ -292,7 +293,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } protected void checkChunkSizeAndMayOpenANewChunk( - TsFileIOWriter fileWriter, IChunkWriter iChunkWriter, int subTaskId, boolean isCrossSpace) + CompactionTsFileWriter fileWriter, IChunkWriter iChunkWriter, int subTaskId, boolean isCrossSpace) throws IOException { if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) { // if chunk point num reaches the check point, then check if the chunk size over threshold diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index 44409808daf..975377264e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -21,6 +21,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; @@ -39,7 +41,7 @@ import java.util.Map; public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWriter { // target fileIOWriters - protected List<TsFileIOWriter> targetFileWriters = new ArrayList<>(); + protected List<CompactionTsFileWriter> targetFileWriters = new ArrayList<>(); // source tsfiles private List<TsFileResource> seqTsFileResources; @@ -77,8 +79,11 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr boolean enableMemoryControl = IoTDBDescriptor.getInstance().getConfig().isEnableMemControl(); for (int i = 0; i < targetResources.size(); i++) { this.targetFileWriters.add( - new TsFileIOWriter( - targetResources.get(i).getTsFile(), enableMemoryControl, memorySizeForEachWriter)); + new CompactionTsFileWriter( + targetResources.get(i).getTsFile(), + enableMemoryControl, + memorySizeForEachWriter, + CompactionType.CROSS_COMPACTION)); isEmptyFile[i] = true; } this.seqTsFileResources = seqFileResources; @@ -99,7 +104,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr @Override public void endChunkGroup() throws IOException { for (int i = 0; i < seqTsFileResources.size(); i++) { - TsFileIOWriter targetFileWriter = targetFileWriters.get(i); + CompactionTsFileWriter targetFileWriter = targetFileWriters.get(i); if (isDeviceExistedInTargetFiles[i]) { // update resource CompactionUtils.updateResource(targetResources.get(i), targetFileWriter, deviceId); @@ -153,7 +158,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr @Override public void close() throws IOException { - for (TsFileIOWriter targetWriter : targetFileWriters) { + for (CompactionTsFileWriter targetWriter : targetFileWriters) { if (targetWriter != null && targetWriter.canWrite()) { targetWriter.close(); } @@ -165,7 +170,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr @Override public void checkAndMayFlushChunkMetadata() throws IOException { for (int i = 0; i < targetFileWriters.size(); i++) { - TsFileIOWriter fileIOWriter = targetFileWriters.get(i); + CompactionTsFileWriter fileIOWriter = targetFileWriters.get(i); fileIOWriter.checkMetadataSizeAndMayFlush(); } } @@ -234,7 +239,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr @Override public long getWriterSize() throws IOException { long totalSize = 0; - for (TsFileIOWriter writer : targetFileWriters) { + for (CompactionTsFileWriter writer : targetFileWriters) { totalSize += writer.getPos(); } return totalSize; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index 6404730d0e4..b22d8d8689e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -31,7 +33,7 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWriter { - protected TsFileIOWriter fileWriter; + protected CompactionTsFileWriter fileWriter; protected boolean isEmptyFile; @@ -50,7 +52,11 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()); boolean enableMemoryControl = IoTDBDescriptor.getInstance().getConfig().isEnableMemControl(); this.fileWriter = - new TsFileIOWriter(targetFileResource.getTsFile(), enableMemoryControl, sizeForFileWriter); + new CompactionTsFileWriter( + targetFileResource.getTsFile(), + enableMemoryControl, + sizeForFileWriter, + CompactionType.INNER_UNSEQ_COMPACTION); this.targetResource = targetFileResource; isEmptyFile = true; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java index 62f3b65a0a2..f23adc332d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java @@ -24,6 +24,10 @@ import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; @@ -31,6 +35,7 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.File; import java.io.IOException; +import java.io.Serializable; public class CompactionTsFileWriter extends TsFileIOWriter { CompactionType type; @@ -54,6 +59,22 @@ public class CompactionTsFileWriter extends TsFileIOWriter { .recordWriteInfo(type, WrittenDataType.NOT_ALIGNED, writtenDataSize); } + @Override + public void writeEmptyValueChunk( + String measurementId, + CompressionType compressionType, + TSDataType tsDataType, + TSEncoding encodingType, + Statistics<? extends Serializable> statistics) + throws IOException { + long beforeOffset = this.getPos(); + super.writeEmptyValueChunk( + measurementId, compressionType, tsDataType, encodingType, statistics); + long writtenDataSize = this.getPos() - beforeOffset; + CompactionMetrics.getInstance() + .recordWriteInfo(type, WrittenDataType.ALIGNED, writtenDataSize); + } + public void writeChunk(IChunkWriter chunkWriter) throws IOException { boolean isAligned = chunkWriter instanceof AlignedChunkWriterImpl; long beforeOffset = this.getPos();
