This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch refactor-compaction-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7fc3df42f526b807011985462156e9a56a94f429 Author: Liu Xuxin <[email protected]> AuthorDate: Fri Jun 9 19:07:27 2023 +0800 complete --- .../readchunk/AlignedSeriesCompactionExecutor.java | 11 ++++---- .../readchunk/SingleSeriesCompactionExecutor.java | 6 ----- .../utils/writer/AbstractCompactionWriter.java | 26 +++---------------- .../compaction/io/CompactionTsFileReader.java | 27 +++++++++++-------- .../compaction/io/CompactionTsFileWriter.java | 30 +++++++++++++++++++--- .../compaction/schedule/CompactionTaskManager.java | 10 -------- .../iotdb/db/engine/storagegroup/DataRegion.java | 5 ++++ 7 files changed, 57 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java index 00f12a2c8d9..616dbf1013e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader; import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; -import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; @@ -39,8 +38,6 @@ import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import com.google.common.util.concurrent.RateLimiter; - import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; @@ -60,8 +57,6 @@ public class AlignedSeriesCompactionExecutor { private final List<IMeasurementSchema> schemaList; private long remainingPointInChunkWriter = 0L; private final CompactionTaskSummary summary; - private final RateLimiter rateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); private final long chunkSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); @@ -99,6 +94,9 @@ public class AlignedSeriesCompactionExecutor { for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair : readerAndChunkMetadataList) { TsFileSequenceReader reader = readerListPair.left; + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markStartOfAlignedSeries(); + } List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right; for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { List<IChunkMetadata> valueChunkMetadataList = @@ -121,6 +119,9 @@ public class AlignedSeriesCompactionExecutor { header.getCompressionType())); } } + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markEndOfAlignedSeries(); + } } List<IMeasurementSchema> schemaList = new ArrayList<>(schemaSet); schemaList.sort(Comparator.comparing(IMeasurementSchema::getMeasurementId)); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index 689f13aac20..89cff34c54e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; -import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -38,8 +37,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import com.google.common.util.concurrent.RateLimiter; - import java.io.IOException; import java.util.LinkedList; import java.util.List; @@ -56,8 +53,6 @@ public class SingleSeriesCompactionExecutor { private ChunkWriterImpl chunkWriter; private Chunk cachedChunk; private ChunkMetadata cachedChunkMetadata; - private RateLimiter compactionRateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); // record the min time and max time to update the target resource private long minStartTimestamp = Long.MAX_VALUE; private long maxEndTimestamp = Long.MIN_VALUE; @@ -322,7 +317,6 @@ public class SingleSeriesCompactionExecutor { private void flushChunkToFileWriter( Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); if (chunkMetadata.getStartTime() < minStartTimestamp) { minStartTimestamp = chunkMetadata.getStartTime(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java index 75e3c76b9af..afcb162311e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -20,10 +20,6 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; -import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.PageException; import org.apache.iotdb.tsfile.file.header.PageHeader; @@ -41,8 +37,6 @@ import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; -import com.google.common.util.concurrent.RateLimiter; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -50,9 +44,6 @@ import java.util.List; public abstract class AbstractCompactionWriter implements AutoCloseable { protected int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); - private RateLimiter compactionRateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); - // check if there is unseq error point during writing protected long[] lastTime = new long[subTaskNum]; @@ -169,8 +160,6 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected void sealChunk( CompactionTsFileWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize()); synchronized (targetWriter) { targetWriter.writeChunk(iChunkWriter); } @@ -191,7 +180,6 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected void flushNonAlignedChunkToFileWriter( CompactionTsFileWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); synchronized (targetWriter) { // seal last chunk to file writer targetWriter.writeChunk(chunkWriters[subTaskId]); @@ -214,8 +202,9 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { targetWriter.writeChunk(alignedChunkWriter); chunkPointNumArray[subTaskId] = 0; + targetWriter.markStartingWritingAligned(); + // flush time chunk - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(timeChunk)); targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata); // flush value chunks @@ -232,10 +221,10 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { Statistics.getStatsByType(valueChunkWriter.getDataType())); continue; } - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, getChunkSize(valueChunk)); targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i)); } + + targetWriter.markEndingWritingAligned(); } } @@ -304,13 +293,6 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) { sealChunk(fileWriter, iChunkWriter, subTaskId); lastCheckIndex = 0; - CompactionMetrics.getInstance() - .recordWriteInfo( - isCrossSpace - ? CompactionType.CROSS_COMPACTION - : CompactionType.INNER_UNSEQ_COMPACTION, - isAlign ? CompactionIoDataType.ALIGNED : CompactionIoDataType.NOT_ALIGNED, - iChunkWriter.estimateMaxSeriesMemSize()); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java index b9b10791de7..efc0e3635c8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java @@ -39,13 +39,13 @@ import java.util.List; */ public class CompactionTsFileReader extends TsFileSequenceReader { /** Tracks the total amount of data (in bytes) that has been read. */ - long readDataSize = 0L; + private volatile long readDataSize = 0L; /** The type of compaction running. */ CompactionType compactionType; /** A flag that indicates if an aligned series is being read. */ - boolean readingAlignedSeries = false; + private volatile boolean readingAlignedSeries = false; /** * Constructs a new instance of CompactionTsFileReader. @@ -78,15 +78,20 @@ public class CompactionTsFileReader extends TsFileSequenceReader { @Override public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { - long before = readDataSize; - Chunk chunk = super.readMemChunk(metaData); - long dataSize = readDataSize - before; - CompactionMetrics.getInstance() - .recordReadInfo( - compactionType, - readingAlignedSeries ? CompactionIoDataType.ALIGNED : CompactionIoDataType.NOT_ALIGNED, - dataSize); - return chunk; + synchronized (this) { + // using synchronized to avoid concurrent read that makes readDataSize not correct + long before = readDataSize; + Chunk chunk = super.readMemChunk(metaData); + long dataSize = readDataSize - before; + CompactionMetrics.getInstance() + .recordReadInfo( + compactionType, + readingAlignedSeries + ? CompactionIoDataType.ALIGNED + : CompactionIoDataType.NOT_ALIGNED, + dataSize); + return chunk; + } } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java index f86f7a4391f..db65178fae8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java @@ -40,6 +40,8 @@ import java.io.Serializable; public class CompactionTsFileWriter extends TsFileIOWriter { CompactionType type; + private volatile boolean isWritingAligned = false; + public CompactionTsFileWriter( File file, boolean enableMemoryControl, long maxMetadataSize, CompactionType type) throws IOException { @@ -50,13 +52,16 @@ public class CompactionTsFileWriter extends TsFileIOWriter { @Override public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException { long beforeOffset = this.getPos(); - CompactionTaskManager.getInstance() - .getMergeWriteRateLimiter() - .acquire(chunk.getHeader().getDataSize() + chunk.getHeader().getSerializedSize()); super.writeChunk(chunk, chunkMetadata); long writtenDataSize = this.getPos() - beforeOffset; + if (writtenDataSize > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) writtenDataSize); + } CompactionMetrics.getInstance() - .recordWriteInfo(type, CompactionIoDataType.NOT_ALIGNED, writtenDataSize); + .recordWriteInfo( + type, + isWritingAligned ? CompactionIoDataType.ALIGNED : CompactionIoDataType.NOT_ALIGNED, + writtenDataSize); } @Override @@ -73,6 +78,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter { long writtenDataSize = this.getPos() - beforeOffset; CompactionMetrics.getInstance() .recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize); + if (writtenDataSize > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) writtenDataSize); + } } public void writeChunk(IChunkWriter chunkWriter) throws IOException { @@ -80,6 +88,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter { long beforeOffset = this.getPos(); chunkWriter.writeToFileWriter(this); long writtenDataSize = this.getPos() - beforeOffset; + if (writtenDataSize > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) writtenDataSize); + } CompactionMetrics.getInstance() .recordWriteInfo( type, @@ -92,7 +103,18 @@ public class CompactionTsFileWriter extends TsFileIOWriter { long beforeSize = this.getPos(); super.endFile(); long writtenDataSize = this.getPos() - beforeSize; + if (writtenDataSize > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) writtenDataSize); + } CompactionMetrics.getInstance() .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize); } + + public void markStartingWritingAligned() { + isWritingAligned = true; + } + + public void markEndingWritingAligned() { + isWritingAligned = false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java index 6dd13065941..baad46f2a0d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java @@ -252,16 +252,6 @@ public class CompactionTaskManager implements IService { mergeWriteRateLimiter.setRate(throughout); } } - /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */ - public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) { - while (bytesLength >= Integer.MAX_VALUE) { - limiter.acquire(Integer.MAX_VALUE); - bytesLength -= Integer.MAX_VALUE; - } - if (bytesLength > 0) { - limiter.acquire((int) bytesLength); - } - } public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) { String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index f4de1add68a..b7a8aa7a958 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -1555,6 +1555,11 @@ public class DataRegion implements IDataRegionForQuery { tsFileResourceList.addAll(tsFileManager.getTsFileList(false)); tsFileResourceList.forEach( x -> { + FileMetrics.getInstance() + .deleteFile( + new long[] {x.getTsFileSize()}, + x.isSeq(), + Collections.singletonList(x.getTsFile().getName())); if (x.getModFile().exists()) { FileMetrics.getInstance().decreaseModFileNum(1); FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize());
