This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch 1.0.1_to_1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1af3ca0621c63c6069cdc4dc2ce07fa2142ded90 Author: Zhang.Jinrui <[email protected]> AuthorDate: Sat Jan 7 15:00:19 2023 +0800 Revert "[IOTDB-5209] Limit the read rate of compaction execution (#8461)" (#8776) --- docs/UserGuide/Reference/Common-Config-Manual.md | 16 ++++++++-------- docs/zh/UserGuide/Reference/Common-Config-Manual.md | 16 ++++++++-------- .../src/assembly/resources/conf/iotdb-common.properties | 4 ++-- .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++------ .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 ++++++----- .../db/engine/compaction/CompactionTaskManager.java | 15 +++++++++++++-- .../cross/utils/AlignedSeriesCompactionExecutor.java | 8 -------- .../cross/utils/NonAlignedSeriesCompactionExecutor.java | 2 -- .../inner/utils/AlignedSeriesCompactionExecutor.java | 9 +++++---- .../inner/utils/SingleSeriesCompactionExecutor.java | 11 ++++++----- .../compaction/writer/AbstractCompactionWriter.java | 12 +++++++----- .../compaction/utils/CompactionConfigRestorer.java | 2 +- 12 files changed, 62 insertions(+), 56 deletions(-) diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md index 4d1d24281c..e2c79e093e 100644 --- a/docs/UserGuide/Reference/Common-Config-Manual.md +++ b/docs/UserGuide/Reference/Common-Config-Manual.md @@ -990,14 +990,14 @@ Different configuration parameters take effect in the following three ways: | Default | 60000 | | Effective | After restart system | -* compaction\_io\_rate\_per\_sec - -|Name| compaction\_io\_rate\_per\_sec | -|:---:|:-----------------------------------------------| -|Description| The io rate of all compaction tasks per second | -|Type| int32 | -|Default| 50 | -|Effective| After restart system | +* compaction\_write\_throughput\_mb\_per\_sec + +|Name| compaction\_write\_throughput\_mb\_per\_sec | +|:---:|:---| +|Description| The write rate of all compaction tasks in MB/s | +|Type| int32 | +|Default| 16 | +|Effective|After restart system| * sub\_compaction\_thread\_count diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md index fa91601017..0c6c28fcfb 100644 --- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md +++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md @@ -1045,14 +1045,14 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。 | 默认值 | 60000 | | 改后生效方式 | 重启服务生效 | -* compaction\_io\_rate\_per\_sec - -|名字| compaction\_io\_rate\_per\_sec | -|:---:|:-------------------------------| -|描述| 每秒合并随机IO的次数。 | -|类型| int32 | -|默认值| 50 | -|改后生效方式| 重启服务生效 | +* compaction\_write\_throughput\_mb\_per\_sec + +|名字| compaction\_write\_throughput\_mb\_per\_sec | +|:---:|:---| +|描述| 每秒可达到的写入吞吐量合并限制。| +|类型| int32 | +|默认值| 16 | +|改后生效方式| 重启服务生效| * sub\_compaction\_thread\_count diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties index ee21625cfd..16e30a896a 100644 --- a/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -627,9 +627,9 @@ cluster_name=defaultCluster # Datatype: long, Unit: ms # compaction_submission_interval_in_ms=60000 -# The limit of io rate can reach per second +# The limit of write throughput merge can reach per second # Datatype: int -# compaction_io_rate_per_sec=50 +# compaction_write_throughput_mb_per_sec=16 # The number of sub compaction threads to be set up to perform compaction. # Currently only works for nonAligned data in cross space compaction and unseq inner space compaction. diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 8991c91116..8664cf8ddc 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -667,8 +667,8 @@ public class IoTDBConfig { */ private long mergeIntervalSec = 0L; - /** The limit of io rate can reach per second */ - private int compactionIORatePerSec = 50; + /** The limit of compaction merge can reach per second */ + private int compactionWriteThroughputMbPerSec = 16; /** * How many thread will be set up to perform compaction, 10 by default. Set to 1 when less than or @@ -1937,12 +1937,12 @@ public class IoTDBConfig { this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount; } - public int getCompactionIORatePerSec() { - return compactionIORatePerSec; + public int getCompactionWriteThroughputMbPerSec() { + return compactionWriteThroughputMbPerSec; } - public void setCompactionIORatePerSec(int compactionIORatePerSec) { - this.compactionIORatePerSec = compactionIORatePerSec; + public void setCompactionWriteThroughputMbPerSec(int compactionWriteThroughputMbPerSec) { + this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec; } public boolean isEnableMemControl() { diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 2c897229ab..c0f4135f9d 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -658,10 +658,11 @@ public class IoTDBDescriptor { "max_cross_compaction_candidate_file_size", Long.toString(conf.getMaxCrossCompactionCandidateFileSize())))); - conf.setCompactionIORatePerSec( + conf.setCompactionWriteThroughputMbPerSec( Integer.parseInt( properties.getProperty( - "compaction_io_rate_per_sec", Integer.toString(conf.getCompactionIORatePerSec())))); + "compaction_write_throughput_mb_per_sec", + Integer.toString(conf.getCompactionWriteThroughputMbPerSec())))); conf.setEnableCompactionValidation( Boolean.parseBoolean( @@ -1438,11 +1439,11 @@ public class IoTDBDescriptor { properties.getProperty( "slow_query_threshold", Long.toString(conf.getSlowQueryThreshold())))); // update merge_write_throughput_mb_per_sec - conf.setCompactionIORatePerSec( + conf.setCompactionWriteThroughputMbPerSec( Integer.parseInt( properties.getProperty( - "compaction_io_rate_per_sec", - Integer.toString(conf.getCompactionIORatePerSec())))); + "merge_write_throughput_mb_per_sec", + Integer.toString(conf.getCompactionWriteThroughputMbPerSec())))); // update insert-tablet-plan's row limit for select-into conf.setSelectIntoInsertTabletPlanRowLimit( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java index 1e5424bc51..ea7e70805f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java @@ -240,8 +240,9 @@ public class CompactionTaskManager implements IService { .containsKey(task); } - public RateLimiter getCompactionIORateLimiter() { - setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIORatePerSec()); + public RateLimiter getMergeWriteRateLimiter() { + setWriteMergeRate( + IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec()); return mergeWriteRateLimiter; } @@ -255,6 +256,16 @@ public class CompactionTaskManager implements IService { mergeWriteRateLimiter.setRate(throughout); } } + /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */ + public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) { + while (bytesLength >= Integer.MAX_VALUE) { + limiter.acquire(Integer.MAX_VALUE); + bytesLength -= Integer.MAX_VALUE; + } + if (bytesLength > 0) { + limiter.acquire((int) bytesLength); + } + } public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) { String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java index ac9e3b17c0..72b364adb8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/AlignedSeriesCompactionExecutor.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.cross.utils; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter; import org.apache.iotdb.db.engine.modification.Modification; @@ -41,8 +40,6 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; -import com.google.common.util.concurrent.RateLimiter; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -57,9 +54,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { private final List<IMeasurementSchema> measurementSchemas; - private final RateLimiter rateLimiter = - CompactionTaskManager.getInstance().getCompactionIORateLimiter(); - public AlignedSeriesCompactionExecutor( AbstractCompactionWriter compactionWriter, Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap, @@ -287,7 +281,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException { AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetadataElement.chunkMetadata; - rateLimiter.acquire(1); chunkMetadataElement.chunk = readerCacheMap .get(chunkMetadataElement.fileElement.resource) @@ -299,7 +292,6 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { valueChunks.add(null); continue; } - rateLimiter.acquire(1); valueChunks.add( readerCacheMap .get(chunkMetadataElement.fileElement.resource) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java index 1c0d3a1e75..da26d515c7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/NonAlignedSeriesCompactionExecutor.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.cross.utils; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter; import org.apache.iotdb.db.engine.modification.Modification; @@ -172,7 +171,6 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor @Override void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException { - CompactionTaskManager.getInstance().getCompactionIORateLimiter().acquire(1); chunkMetadataElement.chunk = readerCacheMap .get(chunkMetadataElement.fileElement.resource) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java index 3ecc19d11e..3f4d475679 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java @@ -63,7 +63,7 @@ public class AlignedSeriesCompactionExecutor { private final List<IMeasurementSchema> schemaList; private long remainingPointInChunkWriter = 0L; private final RateLimiter rateLimiter = - CompactionTaskManager.getInstance().getCompactionIORateLimiter(); + CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); private final long chunkSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); @@ -139,13 +139,13 @@ public class AlignedSeriesCompactionExecutor { while (readerIterator.hasNext()) { Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader(); CompactionMetricsRecorder.recordReadInfo(chunkReaderAndChunkSize.right); - rateLimiter.acquire(schemaList.size() + 1); compactOneAlignedChunk(chunkReaderAndChunkSize.left); } } if (remainingPointInChunkWriter != 0L) { - rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1); + CompactionTaskManager.mergeRateLimiterAcquire( + rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); CompactionMetricsRecorder.recordWriteInfo( CompactionType.INNER_SEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, @@ -187,7 +187,8 @@ public class AlignedSeriesCompactionExecutor { private void flushChunkWriterIfLargeEnough() throws IOException { if (remainingPointInChunkWriter >= chunkPointNumThreshold || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) { - rateLimiter.acquire(chunkWriter.getValueChunkWriterList().size() + 1); + CompactionTaskManager.mergeRateLimiterAcquire( + rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); CompactionMetricsRecorder.recordWriteInfo( CompactionType.INNER_SEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java index e8f7a1186a..9575757906 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java @@ -58,7 +58,7 @@ public class SingleSeriesCompactionExecutor { private Chunk cachedChunk; private ChunkMetadata cachedChunkMetadata; private RateLimiter compactionRateLimiter = - CompactionTaskManager.getInstance().getCompactionIORateLimiter(); + CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); // record the min time and max time to update the target resource private long minStartTimestamp = Long.MAX_VALUE; private long maxEndTimestamp = Long.MIN_VALUE; @@ -118,7 +118,6 @@ public class SingleSeriesCompactionExecutor { TsFileSequenceReader reader = readerListPair.left; List<ChunkMetadata> chunkMetadataList = readerListPair.right; for (ChunkMetadata chunkMetadata : chunkMetadataList) { - compactionRateLimiter.acquire(1); Chunk currentChunk = reader.readMemChunk(chunkMetadata); if (this.chunkWriter == null) { constructChunkWriterFromReadChunk(currentChunk); @@ -300,7 +299,7 @@ public class SingleSeriesCompactionExecutor { private void flushChunkToFileWriter( Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException { - compactionRateLimiter.acquire(1); + CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); if (chunkMetadata.getStartTime() < minStartTimestamp) { minStartTimestamp = chunkMetadata.getStartTime(); } @@ -318,7 +317,8 @@ public class SingleSeriesCompactionExecutor { private void flushChunkWriterIfLargeEnough() throws IOException { if (pointCountInChunkWriter >= targetChunkPointNum || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) { - compactionRateLimiter.acquire(1); + CompactionTaskManager.mergeRateLimiterAcquire( + compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); CompactionMetricsRecorder.recordWriteInfo( CompactionType.INNER_SEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, @@ -339,7 +339,8 @@ public class SingleSeriesCompactionExecutor { } private void flushChunkWriter() throws IOException { - compactionRateLimiter.acquire(1); + CompactionTaskManager.mergeRateLimiterAcquire( + compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); CompactionMetricsRecorder.recordWriteInfo( CompactionType.INNER_SEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java index e95a386b72..1a0973602d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java @@ -49,7 +49,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); private RateLimiter compactionRateLimiter = - CompactionTaskManager.getInstance().getCompactionIORateLimiter(); + CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); // check if there is unseq error point during writing protected long[] lastTime = new long[subTaskNum]; @@ -164,7 +164,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId) throws IOException { - compactionRateLimiter.acquire(1); + CompactionTaskManager.mergeRateLimiterAcquire( + compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize()); synchronized (targetWriter) { iChunkWriter.writeToFileWriter(targetWriter); } @@ -185,11 +186,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected void flushNonAlignedChunkToFileWriter( TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) throws IOException { + CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); synchronized (targetWriter) { // seal last chunk to file writer chunkWriters[subTaskId].writeToFileWriter(targetWriter); chunkPointNumArray[subTaskId] = 0; - compactionRateLimiter.acquire(1); targetWriter.writeChunk(chunk, chunkMetadata); } } @@ -209,7 +210,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { chunkPointNumArray[subTaskId] = 0; // flush time chunk - compactionRateLimiter.acquire(1); + CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(timeChunk)); targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata); // flush value chunks @@ -226,7 +227,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { valueChunkWriter.getStatistics()); continue; } - compactionRateLimiter.acquire(1); + CompactionTaskManager.mergeRateLimiterAcquire( + compactionRateLimiter, getChunkSize(valueChunk)); targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i)); } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java index ada6af6de5..613308c5d7 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java @@ -74,7 +74,7 @@ public class CompactionConfigRestorer { config.setCompactionThreadCount(concurrentCompactionThread); config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs); config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs); - config.setCompactionIORatePerSec(compactionWriteThroughputMbPerSec); + config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec); config.setCrossCompactionPerformer(oldCrossPerformer); config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer); config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);
