This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5667cf33f68f85f5463ccb89fd3a4ad4adf307c3
Author: 张凌哲 <[email protected]>
AuthorDate: Sat Oct 10 12:11:58 2020 +0800

    add read compaction limiter
---
 .../resources/conf/iotdb-engine.properties         |  5 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 23 ++++++++---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  7 ++--
 .../iotdb/db/engine/merge/manage/MergeManager.java | 33 ++++++++++++----
 .../tsfilemanagement/utils/HotCompactionUtils.java | 44 ++++++++++++++++------
 .../iotdb/db/engine/merge/MergeManagerTest.java    |  2 +-
 .../iotdb/tsfile/file/metadata/ChunkMetadata.java  |  3 ++
 7 files changed, 87 insertions(+), 30 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 5fb3777..819e561 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -325,7 +325,10 @@ force_full_merge=false
 chunk_merge_point_threshold=20480
 
 # The limit of write throughput merge can reach per second
-merge_throughput_mb_per_sec=16
+merge_write_throughput_mb_per_sec=16
+
+# The limit of read throughput merge can reach per second
+merge_read_throughput_mb_per_sec=16
 
 ####################
 ### Metadata Cache Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d53892f..c6163a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -543,7 +543,12 @@ public class IoTDBConfig {
   /**
    * The limit of write throughput merge can reach per second
    */
-  private int mergeThroughputMbPerSec = 16;
+  private int mergeWriteThroughputMbPerSec = 16;
+
+  /**
+   * The limit of read throughput merge can reach per second
+   */
+  private int mergeReadThroughputMbPerSec = 16;
 
   private MergeFileStrategy mergeFileStrategy = 
MergeFileStrategy.MAX_SERIES_NUM;
 
@@ -1259,12 +1264,20 @@ public class IoTDBConfig {
     this.chunkMergePointThreshold = chunkMergePointThreshold;
   }
 
-  public int getMergeThroughputMbPerSec() {
-    return mergeThroughputMbPerSec;
+  public int getMergeWriteThroughputMbPerSec() {
+    return mergeWriteThroughputMbPerSec;
+  }
+
+  public void setMergeWriteThroughputMbPerSec(int 
mergeWriteThroughputMbPerSec) {
+    this.mergeWriteThroughputMbPerSec = mergeWriteThroughputMbPerSec;
+  }
+
+  public int getMergeReadThroughputMbPerSec() {
+    return mergeReadThroughputMbPerSec;
   }
 
-  public void setMergeThroughputMbPerSec(int mergeThroughputMbPerSec) {
-    this.mergeThroughputMbPerSec = mergeThroughputMbPerSec;
+  public void setMergeReadThroughputMbPerSec(int mergeReadThroughputMbPerSec) {
+    this.mergeReadThroughputMbPerSec = mergeReadThroughputMbPerSec;
   }
 
   public long getMemtableSizeThreshold() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 0d35a0d..084f412 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
-import java.net.URI;
 import java.net.URL;
 import java.time.ZoneId;
 import java.util.Properties;
@@ -369,8 +368,10 @@ public class IoTDBDescriptor {
           Boolean.toString(conf.isForceFullMerge()))));
       conf.setChunkMergePointThreshold(Integer.parseInt(properties.getProperty(
           "chunk_merge_point_threshold", 
Integer.toString(conf.getChunkMergePointThreshold()))));
-      conf.setMergeThroughputMbPerSec(Integer.parseInt(properties.getProperty(
-          "merge_throughput_mb_per_sec", 
Integer.toString(conf.getMergeThroughputMbPerSec()))));
+      
conf.setMergeWriteThroughputMbPerSec(Integer.parseInt(properties.getProperty(
+          "merge_write_throughput_mb_per_sec", 
Integer.toString(conf.getMergeWriteThroughputMbPerSec()))));
+      
conf.setMergeReadThroughputMbPerSec(Integer.parseInt(properties.getProperty(
+          "merge_read_throughput_mb_per_sec", 
Integer.toString(conf.getMergeReadThroughputMbPerSec()))));
 
       conf.setEnablePartialInsert(
           Boolean.parseBoolean(properties.getProperty("enable_partial_insert",
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index d8bcfb5..3ba8bed 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -60,7 +60,8 @@ public class MergeManager implements IService, 
MergeManagerMBean {
   private final String mbeanName = String
       .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
           getID().getJmxName());
-  private final RateLimiter mergeRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
+  private final RateLimiter mergeWriteRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
+  private final RateLimiter mergeReadRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
 
   private AtomicInteger threadCnt = new AtomicInteger();
   private ThreadPoolExecutor mergeTaskPool;
@@ -74,13 +75,18 @@ public class MergeManager implements IService, 
MergeManagerMBean {
   private MergeManager() {
   }
 
-  public RateLimiter getMergeRateLimiter() {
-    
setMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeThroughputMbPerSec());
-    return mergeRateLimiter;
+  public RateLimiter getMergeWriteRateLimiter() {
+    
setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec());
+    return mergeWriteRateLimiter;
+  }
+
+  public RateLimiter getMergeReadRateLimiter() {
+    
setReadMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeReadThroughputMbPerSec());
+    return mergeReadRateLimiter;
   }
 
   /**
-   * wait by throughoutMbPerSec limit to avoid continuous Write
+   * wait by throughoutMbPerSec limit to avoid continuous Write Or Read
    */
   public static void mergeRateLimiterAcquire(RateLimiter limiter, long 
bytesLength) {
     while (bytesLength >= Integer.MAX_VALUE) {
@@ -92,14 +98,25 @@ public class MergeManager implements IService, 
MergeManagerMBean {
     }
   }
 
-  private void setMergeRate(final double throughoutMbPerSec) {
+  private void setWriteMergeRate(final double throughoutMbPerSec) {
+    double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
+    // if throughout = 0, disable rate limiting
+    if (throughout == 0) {
+      throughout = Double.MAX_VALUE;
+    }
+    if (mergeWriteRateLimiter.getRate() != throughout) {
+      mergeWriteRateLimiter.setRate(throughout);
+    }
+  }
+
+  private void setReadMergeRate(final double throughoutMbPerSec) {
     double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
     // if throughout = 0, disable rate limiting
     if (throughout == 0) {
       throughout = Double.MAX_VALUE;
     }
-    if (mergeRateLimiter.getRate() != throughout) {
-      mergeRateLimiter.setRate(throughout);
+    if (mergeReadRateLimiter.getRate() != throughout) {
+      mergeReadRateLimiter.setRate(throughout);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
index 411b754..d7dcd4c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
@@ -62,7 +62,7 @@ public class HotCompactionUtils {
     throw new IllegalStateException("Utility class");
   }
 
-  private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
+  private static Pair<ChunkMetadata, Chunk> readByAppendMerge(RateLimiter 
compactionReadRateLimiter,
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) 
throws IOException {
     ChunkMetadata newChunkMetadata = null;
     Chunk newChunk = null;
@@ -70,6 +70,8 @@ public class HotCompactionUtils {
         .entrySet()) {
       for (ChunkMetadata chunkMetadata : entry.getValue()) {
         Chunk chunk = entry.getKey().readMemChunk(chunkMetadata);
+        MergeManager
+            .mergeRateLimiterAcquire(compactionReadRateLimiter, 
chunk.getData().position());
         if (newChunkMetadata == null) {
           newChunkMetadata = chunkMetadata;
           newChunk = chunk;
@@ -82,7 +84,7 @@ public class HotCompactionUtils {
     return new Pair<>(newChunkMetadata, newChunk);
   }
 
-  private static long readByDeserializeMerge(
+  private static long readByDeserializeMerge(RateLimiter 
compactionReadRateLimiter,
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, 
long maxVersion,
       Map<Long, TimeValuePair> timeValuePairMap)
       throws IOException {
@@ -94,30 +96,35 @@ public class HotCompactionUtils {
         maxVersion = Math.max(chunkMetadata.getVersion(), maxVersion);
         IChunkReader chunkReader = new ChunkReaderByTimestamp(
             reader.readMemChunk(chunkMetadata));
+        long chunkSize = 0;
         while (chunkReader.hasNextSatisfiedPage()) {
           IPointReader iPointReader = new BatchDataIterator(
               chunkReader.nextPageData());
           while (iPointReader.hasNextTimeValuePair()) {
             TimeValuePair timeValuePair = iPointReader.nextTimeValuePair();
+            chunkSize += timeValuePair.getSize();
             timeValuePairMap.put(timeValuePair.getTimestamp(), timeValuePair);
           }
         }
+        MergeManager
+            .mergeRateLimiterAcquire(compactionReadRateLimiter, chunkSize);
       }
     }
     return maxVersion;
   }
 
   private static long writeByAppendMerge(long maxVersion, String device,
-      RateLimiter compactionRateLimiter,
+      RateLimiter compactionWriteRateLimiter, RateLimiter 
compactionReadRateLimiter,
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadatasMap,
       TsFileResource targetResource, RestorableTsFileIOWriter writer) throws 
IOException {
-    Pair<ChunkMetadata, Chunk> chunkPair = 
readByAppendMerge(readerChunkMetadatasMap);
+    Pair<ChunkMetadata, Chunk> chunkPair = 
readByAppendMerge(compactionReadRateLimiter,
+        readerChunkMetadatasMap);
     ChunkMetadata newChunkMetadata = chunkPair.left;
     Chunk newChunk = chunkPair.right;
     if (newChunkMetadata != null && newChunk != null) {
       maxVersion = Math.max(newChunkMetadata.getVersion(), maxVersion);
       // wait for limit write
-      MergeManager.mergeRateLimiterAcquire(compactionRateLimiter,
+      MergeManager.mergeRateLimiterAcquire(compactionWriteRateLimiter,
           newChunk.getHeader().getDataSize() + newChunk.getData().position());
       writer.writeChunk(newChunk, newChunkMetadata);
       targetResource.updateStartTime(device, newChunkMetadata.getStartTime());
@@ -127,11 +134,12 @@ public class HotCompactionUtils {
   }
 
   private static long writeByDeserializeMerge(long maxVersion, String device,
-      RateLimiter compactionRateLimiter,
+      RateLimiter compactionRateLimiter, RateLimiter compactionReadRateLimiter,
       Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
       TsFileResource targetResource, RestorableTsFileIOWriter writer) throws 
IOException {
     Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
-    maxVersion = readByDeserializeMerge(entry.getValue(), maxVersion, 
timeValuePairMap);
+    maxVersion = readByDeserializeMerge(compactionReadRateLimiter, 
entry.getValue(), maxVersion,
+        timeValuePairMap);
     Iterator<List<ChunkMetadata>> chunkMetadataListIterator = 
entry.getValue().values()
         .iterator();
     if (!chunkMetadataListIterator.hasNext()) {
@@ -185,7 +193,8 @@ public class HotCompactionUtils {
       Set<String> devices, boolean sequence) throws IOException {
     RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new 
HashMap<>();
-    RateLimiter compactionRateLimiter = 
MergeManager.getINSTANCE().getMergeRateLimiter();
+    RateLimiter compactionWriteRateLimiter = 
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+    RateLimiter compactionReadRateLimiter = 
MergeManager.getINSTANCE().getMergeReadRateLimiter();
     Set<String> tsFileDevicesMap = getTsFileDevicesSet(tsFileResources, 
tsFileSequenceReaderMap,
         storageGroup);
     for (String device : tsFileDevicesMap) {
@@ -200,8 +209,10 @@ public class HotCompactionUtils {
             tsFileSequenceReaderMap, storageGroup);
         Map<String, List<ChunkMetadata>> chunkMetadataMap = reader
             .readChunkMetadataInDevice(device);
+        long chunkMetadataSize = 0;
         for (Entry<String, List<ChunkMetadata>> entry : 
chunkMetadataMap.entrySet()) {
           for (ChunkMetadata chunkMetadata : entry.getValue()) {
+            chunkMetadataSize += 
chunkMetadata.getStatistics().calculateRamSize();
             Map<TsFileSequenceReader, List<ChunkMetadata>> 
readerChunkMetadataMap;
             String measurementUid = chunkMetadata.getMeasurementUid();
             if (measurementChunkMetadataMap.containsKey(measurementUid)) {
@@ -221,12 +232,17 @@ public class HotCompactionUtils {
                 .put(chunkMetadata.getMeasurementUid(), 
readerChunkMetadataMap);
           }
         }
+        // wait for limit read
+        MergeManager
+            .mergeRateLimiterAcquire(compactionReadRateLimiter, 
chunkMetadataSize);
       }
       if (!sequence) {
         long maxVersion = Long.MIN_VALUE;
         for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> 
entry : measurementChunkMetadataMap
             .entrySet()) {
-          maxVersion = writeByDeserializeMerge(maxVersion, device, 
compactionRateLimiter, entry,
+          maxVersion = writeByDeserializeMerge(maxVersion, device, 
compactionWriteRateLimiter,
+              compactionReadRateLimiter,
+              entry,
               targetResource, writer);
         }
         writer.endChunkGroup();
@@ -250,11 +266,15 @@ public class HotCompactionUtils {
           }
           if (isPageEnoughLarge) {
             logger.info("{} [Hot Compaction] page enough large, use append 
merge", storageGroup);
-            maxVersion = writeByAppendMerge(maxVersion, device, 
compactionRateLimiter,
+            maxVersion = writeByAppendMerge(maxVersion, device, 
compactionWriteRateLimiter,
+                compactionReadRateLimiter,
                 readerChunkMetadatasMap, targetResource, writer);
           } else {
-            logger.info("{} [Hot Compaction] page enough large, use 
deserialize merge", storageGroup);
-            maxVersion = writeByDeserializeMerge(maxVersion, device, 
compactionRateLimiter, entry,
+            logger
+                .info("{} [Hot Compaction] page enough large, use deserialize 
merge", storageGroup);
+            maxVersion = writeByDeserializeMerge(maxVersion, device, 
compactionWriteRateLimiter,
+                compactionReadRateLimiter,
+                entry,
                 targetResource, writer);
           }
         }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
index d03abc0..ecbeb63 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
@@ -33,7 +33,7 @@ public class MergeManagerTest extends MergeTest {
 
   @Test
   public void testRateLimiter() {
-    RateLimiter compactionRateLimiter = 
MergeManager.getINSTANCE().getMergeRateLimiter();
+    RateLimiter compactionRateLimiter = 
MergeManager.getINSTANCE().getMergeWriteRateLimiter();
     long startTime = System.currentTimeMillis();
     MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 160 * 1024 * 
1024L);
     assertTrue((System.currentTimeMillis() - startTime) <= 1000);
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index da68c93..45a45b4 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -260,6 +260,9 @@ public class ChunkMetadata implements Accountable {
     this.ramSize = size;
   }
 
+  /**
+   * must use calculate ram size first
+   */
   @Override
   public long getRamSize() {
     return ramSize;

Reply via email to