This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ec573a61e80 [to dev/1.3] Compaction Read Metric and rate limit are
bypassed due to parent class modification (#14565)
ec573a61e80 is described below
commit ec573a61e80bbd5a69c4e6cbc3b8d58d099a198c
Author: shuwenwei <[email protected]>
AuthorDate: Fri Dec 27 12:15:34 2024 +0800
[to dev/1.3] Compaction Read Metric and rate limit are bypassed due to
parent class modification (#14565)
* fix compaction read metric
* fix ut
---
.../compaction/io/CompactionTsFileReader.java | 14 ++-----
.../inner/InnerSequenceCompactionSpeedTest.java | 43 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 279a5f2be0b..a8937eb3444 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -41,6 +41,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.LongConsumer;
/**
* This class extends the TsFileSequenceReader class to read and manage TsFile
with a focus on
@@ -72,9 +73,10 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
}
@Override
- protected ByteBuffer readData(long position, int totalSize) throws
IOException {
+ protected ByteBuffer readData(long position, int totalSize, LongConsumer
ioSizeRecorder)
+ throws IOException {
acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
- ByteBuffer buffer = super.readData(position, totalSize);
+ ByteBuffer buffer = super.readData(position, totalSize, ioSizeRecorder);
if (position >= metadataOffset) {
CompactionMetrics.getInstance()
.recordReadInfo(compactionType, CompactionIoDataType.METADATA,
totalSize);
@@ -114,14 +116,6 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
return this.tsFileInput.wrapAsInputStream();
}
- public ByteBuffer readPageWithoutUnCompressing(long startOffset, int
pageSize)
- throws IOException {
- if (pageSize == 0) {
- return null;
- }
- return readData(startOffset, pageSize);
- }
-
public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
getTimeseriesMetadataAndOffsetByDevice(
MetadataIndexNode measurementNode,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
index 237396fa882..f417c042e7c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class InnerSequenceCompactionSpeedTest extends AbstractCompactionTest {
@@ -123,4 +124,46 @@ public class InnerSequenceCompactionSpeedTest extends
AbstractCompactionTest {
Assert.assertTrue(
TimeUnit.SECONDS.toMillis(tsFileSize / IoTDBConstant.MB + 30) >
task.getTimeCost());
}
+
+ @Test
+ public void testReadRateLimit() throws IOException, InterruptedException {
+ int compactionReadOperationPerSec =
+
IoTDBDescriptor.getInstance().getConfig().getCompactionReadOperationPerSec();
+ CompactionTaskManager.getInstance().setCompactionReadOperationRate(1);
+ try {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ for (int i = 0; i < 100; i++) {
+ writer.startChunkGroup("d" + i);
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s0",
+ new TimeRange[] {new TimeRange(1000, 2000)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ seqResources.add(resource);
+ tsFileManager.add(resource, true);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, seqResources, true, new
ReadChunkCompactionPerformer(), 0);
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread t =
+ new Thread(
+ () -> {
+ task.start();
+ Assert.assertTrue(Thread.currentThread().isInterrupted());
+ latch.countDown();
+ });
+ t.start();
+ Assert.assertFalse(latch.await(5, TimeUnit.SECONDS));
+ t.interrupt();
+ latch.await();
+ } finally {
+ CompactionTaskManager.getInstance()
+ .setCompactionReadOperationRate(compactionReadOperationPerSec);
+ }
+ }
}