This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ec573a61e80 [to dev/1.3] Compaction Read Metric and rate limit are 
bypassed due to parent class modification (#14565)
ec573a61e80 is described below

commit ec573a61e80bbd5a69c4e6cbc3b8d58d099a198c
Author: shuwenwei <[email protected]>
AuthorDate: Fri Dec 27 12:15:34 2024 +0800

    [to dev/1.3] Compaction Read Metric and rate limit are bypassed due to 
parent class modification (#14565)
    
    * fix compaction read metric
    
    * fix ut
---
 .../compaction/io/CompactionTsFileReader.java      | 14 ++-----
 .../inner/InnerSequenceCompactionSpeedTest.java    | 43 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 279a5f2be0b..a8937eb3444 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -41,6 +41,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.LongConsumer;
 
 /**
  * This class extends the TsFileSequenceReader class to read and manage TsFile 
with a focus on
@@ -72,9 +73,10 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
   }
 
   @Override
-  protected ByteBuffer readData(long position, int totalSize) throws 
IOException {
+  protected ByteBuffer readData(long position, int totalSize, LongConsumer 
ioSizeRecorder)
+      throws IOException {
     acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
-    ByteBuffer buffer = super.readData(position, totalSize);
+    ByteBuffer buffer = super.readData(position, totalSize, ioSizeRecorder);
     if (position >= metadataOffset) {
       CompactionMetrics.getInstance()
           .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
totalSize);
@@ -114,14 +116,6 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
     return this.tsFileInput.wrapAsInputStream();
   }
 
-  public ByteBuffer readPageWithoutUnCompressing(long startOffset, int 
pageSize)
-      throws IOException {
-    if (pageSize == 0) {
-      return null;
-    }
-    return readData(startOffset, pageSize);
-  }
-
   public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
       getTimeseriesMetadataAndOffsetByDevice(
           MetadataIndexNode measurementNode,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
index 237396fa882..f417c042e7c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public class InnerSequenceCompactionSpeedTest extends AbstractCompactionTest {
@@ -123,4 +124,46 @@ public class InnerSequenceCompactionSpeedTest extends 
AbstractCompactionTest {
     Assert.assertTrue(
         TimeUnit.SECONDS.toMillis(tsFileSize / IoTDBConstant.MB + 30) > 
task.getTimeCost());
   }
+
+  @Test
+  public void testReadRateLimit() throws IOException, InterruptedException {
+    int compactionReadOperationPerSec =
+        
IoTDBDescriptor.getInstance().getConfig().getCompactionReadOperationPerSec();
+    CompactionTaskManager.getInstance().setCompactionReadOperationRate(1);
+    try {
+      TsFileResource resource = createEmptyFileAndResource(true);
+      try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+        for (int i = 0; i < 100; i++) {
+          writer.startChunkGroup("d" + i);
+          writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+              "s0",
+              new TimeRange[] {new TimeRange(1000, 2000)},
+              TSEncoding.PLAIN,
+              CompressionType.LZ4);
+          writer.endChunkGroup();
+        }
+        writer.endFile();
+      }
+      seqResources.add(resource);
+      tsFileManager.add(resource, true);
+      InnerSpaceCompactionTask task =
+          new InnerSpaceCompactionTask(
+              0, tsFileManager, seqResources, true, new 
ReadChunkCompactionPerformer(), 0);
+      CountDownLatch latch = new CountDownLatch(1);
+      Thread t =
+          new Thread(
+              () -> {
+                task.start();
+                Assert.assertTrue(Thread.currentThread().isInterrupted());
+                latch.countDown();
+              });
+      t.start();
+      Assert.assertFalse(latch.await(5, TimeUnit.SECONDS));
+      t.interrupt();
+      latch.await();
+    } finally {
+      CompactionTaskManager.getInstance()
+          .setCompactionReadOperationRate(compactionReadOperationPerSec);
+    }
+  }
 }

Reply via email to