This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 70e2e0ea211be6cb944c7ffffc6de69143cac2f4
Author: 张凌哲 <[email protected]>
AuthorDate: Mon Oct 12 12:54:44 2020 +0800

    add thread pool limit
---
 .../resources/conf/iotdb-engine.properties         |   4 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  22 +++-
 .../engine/storagegroup/StorageGroupProcessor.java | 124 ++++++++++++---------
 .../HotCompactionMergeTaskPoolManager.java         |   9 +-
 .../iotdb/db/integration/IoTDBMergeTest.java       |   2 +-
 5 files changed, 100 insertions(+), 61 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 819e561..7810559 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -324,6 +324,10 @@ force_full_merge=false
 # When less than 0, this mechanism is disabled.
 chunk_merge_point_threshold=20480
 
+# How many thread will be set up to perform hot compaction, 30 by default.
+# Set to 1 when less than or equal to 0.
+hot_compaction_thread_num=30
+
 # The limit of write throughput merge can reach per second
 merge_write_throughput_mb_per_sec=16
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c6163a6..d4c5dac 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -541,11 +541,17 @@ public class IoTDBConfig {
   private int chunkMergePointThreshold = 20480;
 
   /**
-   * The limit of write throughput merge can reach per second
+   * The limit of hot compaction merge can reach per second
    */
   private int mergeWriteThroughputMbPerSec = 16;
 
   /**
+   * How many thread will be set up to perform hot compaction, 30 by default. 
Set to 1 when less
+   * than or equal to 0.
+   */
+  private int hotCompactionThreadNum = 30;
+
+  /**
    * The limit of read throughput merge can reach per second
    */
   private int mergeReadThroughputMbPerSec = 16;
@@ -889,11 +895,11 @@ public class IoTDBConfig {
   }
 
   public boolean isEnableDiscardOutOfOrderData() {
-    return enableDiscardOutOfOrderData ;
+    return enableDiscardOutOfOrderData;
   }
 
-  public void setEnableDiscardOutOfOrderData(boolean 
enableDiscardOutOfOrderData ) {
-    this.enableDiscardOutOfOrderData  =  enableDiscardOutOfOrderData ;
+  public void setEnableDiscardOutOfOrderData(boolean 
enableDiscardOutOfOrderData) {
+    this.enableDiscardOutOfOrderData = enableDiscardOutOfOrderData;
   }
 
   public int getFlushWalThreshold() {
@@ -1264,6 +1270,14 @@ public class IoTDBConfig {
     this.chunkMergePointThreshold = chunkMergePointThreshold;
   }
 
+  public int getHotCompactionThreadNum() {
+    return hotCompactionThreadNum;
+  }
+
+  public void setHotCompactionThreadNum(int hotCompactionThreadNum) {
+    this.hotCompactionThreadNum = hotCompactionThreadNum;
+  }
+
   public int getMergeWriteThroughputMbPerSec() {
     return mergeWriteThroughputMbPerSec;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5038d00..f613d42 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
@@ -145,11 +146,6 @@ public class StorageGroupProcessor {
    */
   private final Object closeStorageGroupCondition = new Object();
   /**
-   * hotCompactionMergeWorking is used to wait for last hot compaction to be 
done.
-   */
-  private volatile boolean hotCompactionMergeWorking = false;
-
-  /**
    * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when 
a query is executed.
    */
   private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
@@ -161,7 +157,10 @@ public class StorageGroupProcessor {
    * time partition id in the storage group -> tsFileProcessor for this time 
partition
    */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors 
= new TreeMap<>();
-
+  /**
+   * hotCompactionMergeWorking is used to wait for last hot compaction to be 
done.
+   */
+  private volatile boolean hotCompactionMergeWorking = false;
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
 
@@ -315,7 +314,8 @@ public class StorageGroupProcessor {
       }
       RecoverMergeTask recoverMergeTask = new RecoverMergeTask(
           new ArrayList<>(tsFileManagement.getTsFileList(true)),
-          tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(), 
tsFileManagement::mergeEndAction,
+          tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(),
+          tsFileManagement::mergeEndAction,
           taskName,
           IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), 
storageGroupName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, 
taskName);
@@ -606,14 +606,22 @@ public class StorageGroupProcessor {
       // init map
       long timePartitionId = 
StorageEngine.getTimePartition(insertRowPlan.getTime());
 
-      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new 
HashMap<>());
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(timePartitionId, id -> new HashMap<>());
 
-      // insert to sequence or unSequence file
-      insertToTsFileProcessor(insertRowPlan,
+      boolean isSequence =
           insertRowPlan.getTime() > 
partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
-              .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE));
+              .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE);
+
+      //is unsequence and user set config to discard out of order data
+      if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
+          .isEnableDiscardOutOfOrderData()) {
+        return;
+      }
+
+      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new 
HashMap<>());
+      // insert to sequence or unSequence file
+      insertToTsFileProcessor(insertRowPlan, isSequence);
 
     } finally {
       writeUnlock();
@@ -670,9 +678,12 @@ public class StorageGroupProcessor {
         // start next partition
         if (curTimePartition != beforeTimePartition) {
           // insert last time partition
-          noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, 
loc, isSequence,
-              results,
-              beforeTimePartition) && noFailure;
+          if (isSequence || !IoTDBDescriptor.getInstance().getConfig()
+              .isEnableDiscardOutOfOrderData()) {
+            noFailure = insertTabletToTsFileProcessor(insertTabletPlan, 
before, loc, isSequence,
+                results,
+                beforeTimePartition) && noFailure;
+          }
           // re initialize
           before = loc;
           beforeTimePartition = curTimePartition;
@@ -686,8 +697,11 @@ public class StorageGroupProcessor {
           // judge if we should insert sequence
           if (!isSequence && time > lastFlushTime) {
             // insert into unsequence and then start sequence
-            noFailure = insertTabletToTsFileProcessor(insertTabletPlan, 
before, loc, false, results,
-                beforeTimePartition) && noFailure;
+            if 
(!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
+              noFailure =
+                  insertTabletToTsFileProcessor(insertTabletPlan, before, loc, 
false, results,
+                      beforeTimePartition) && noFailure;
+            }
             before = loc;
             isSequence = true;
           }
@@ -696,7 +710,8 @@ public class StorageGroupProcessor {
       }
 
       // do not forget last part
-      if (before < loc) {
+      if (before < loc && (isSequence || 
!IoTDBDescriptor.getInstance().getConfig()
+          .isEnableDiscardOutOfOrderData())) {
         noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, 
loc, isSequence,
             results, beforeTimePartition) && noFailure;
       }
@@ -771,6 +786,9 @@ public class StorageGroupProcessor {
   }
 
   private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long 
latestFlushedTime) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+      return;
+    }
     MeasurementMNode[] mNodes = plan.getMeasurementMNodes();
     for (int i = 0; i < mNodes.length; i++) {
       if (plan.getColumns()[i] == null) {
@@ -823,6 +841,9 @@ public class StorageGroupProcessor {
   }
 
   private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long 
latestFlushedTime) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+      return;
+    }
     MeasurementMNode[] mNodes = plan.getMeasurementMNodes();
     for (int i = 0; i < mNodes.length; i++) {
       if (plan.getValues()[i] == null) {
@@ -1439,12 +1460,11 @@ public class StorageGroupProcessor {
 
   private void tryToDeleteLastCache(PartialPath deviceId, String 
measurementId, long startTime,
       long endTime) throws WriteProcessException {
-    MNode node = null;
     try {
       MManager manager = MManager.getInstance();
-      node = manager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
+      MNode node = manager.getDeviceNodeWithAutoCreate(deviceId);
 
-      MNode measurementNode = manager.getChild(node, measurementId);
+      MNode measurementNode = node.getChild(measurementId);
       if (measurementNode != null) {
         TimeValuePair lastPair = ((MeasurementMNode) 
measurementNode).getCachedLast();
         if (lastPair != null && startTime <= lastPair.getTimestamp()
@@ -1454,10 +1474,6 @@ public class StorageGroupProcessor {
       }
     } catch (MetadataException e) {
       throw new WriteProcessException(e);
-    } finally {
-      if (node != null) {
-        node.readUnlock();
-      }
     }
   }
 
@@ -1545,7 +1561,7 @@ public class StorageGroupProcessor {
             .submitTask(
                 tsFileManagement.new 
HotCompactionMergeTask(this::closeHotCompactionMergeCallBack,
                     tsFileProcessor.getTimeRangeId()));
-      } catch (IOException e) {
+      } catch (IOException | RejectedExecutionException e) {
         this.closeHotCompactionMergeCallBack();
         logger.error("{} hot compaction submit task failed", storageGroupName);
       }
@@ -1640,7 +1656,7 @@ public class StorageGroupProcessor {
     writeLock();
     try {
       this.tsFileManagement.merge(fullMerge, 
tsFileManagement.getTsFileList(true),
-          tsFileManagement.getTsFileList(false),dataTTL);
+          tsFileManagement.getTsFileList(false), dataTTL);
     } finally {
       writeUnlock();
     }
@@ -2260,34 +2276,6 @@ public class StorageGroupProcessor {
     return 
partitionFileVersions.containsAll(tsFileResource.getHistoricalVersions());
   }
 
-  private enum LoadTsFileType {
-    LOAD_SEQUENCE, LOAD_UNSEQUENCE
-  }
-
-  @FunctionalInterface
-  public interface CloseTsFileCallBack {
-
-    void call(TsFileProcessor caller) throws TsFileProcessorException, 
IOException;
-  }
-
-  @FunctionalInterface
-  public interface UpdateEndTimeCallBack {
-
-    boolean call(TsFileProcessor caller);
-  }
-
-  @FunctionalInterface
-  public interface UpgradeTsFileResourceCallBack {
-
-    void call(TsFileResource caller);
-  }
-
-  @FunctionalInterface
-  public interface CloseHotCompactionMergeCallBack {
-
-    void call();
-  }
-
   /**
    * remove all partitions that satisfy a filter.
    */
@@ -2338,6 +2326,34 @@ public class StorageGroupProcessor {
     }
   }
 
+  private enum LoadTsFileType {
+    LOAD_SEQUENCE, LOAD_UNSEQUENCE
+  }
+
+  @FunctionalInterface
+  public interface CloseTsFileCallBack {
+
+    void call(TsFileProcessor caller) throws TsFileProcessorException, 
IOException;
+  }
+
+  @FunctionalInterface
+  public interface UpdateEndTimeCallBack {
+
+    boolean call(TsFileProcessor caller);
+  }
+
+  @FunctionalInterface
+  public interface UpgradeTsFileResourceCallBack {
+
+    void call(TsFileResource caller);
+  }
+
+  @FunctionalInterface
+  public interface CloseHotCompactionMergeCallBack {
+
+    void call();
+  }
+
   @FunctionalInterface
   public interface TimePartitionFilter {
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java
index 2697f5d..f910c4b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java
@@ -20,9 +20,11 @@
 package org.apache.iotdb.db.engine.tsfilemanagement;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement.HotCompactionMergeTask;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
@@ -48,7 +50,9 @@ public class HotCompactionMergeTaskPoolManager implements 
IService {
   public void start() {
     if (pool == null) {
       this.pool = IoTDBThreadPoolFactory
-          .newCachedThreadPool(ThreadName.HOT_COMPACTION_SERVICE.getName());
+          .newScheduledThreadPool(
+              
IoTDBDescriptor.getInstance().getConfig().getHotCompactionThreadNum(),
+              ThreadName.HOT_COMPACTION_SERVICE.getName());
     }
     logger.info("Hot compaction merge task manager started.");
   }
@@ -100,7 +104,8 @@ public class HotCompactionMergeTaskPoolManager implements 
IService {
     return ServiceType.HOT_COMPACTION_SERVICE;
   }
 
-  public void submitTask(HotCompactionMergeTask hotCompactionMergeTask) {
+  public void submitTask(HotCompactionMergeTask hotCompactionMergeTask)
+      throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
       pool.submit(hotCompactionMergeTask);
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index f65130d..c4df986 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -294,7 +294,7 @@ public class IoTDBMergeTest {
       }
       // it is uncertain whether the sub tasks are created at this time point, 
and we are only
       // sure that the main task is created
-      assertEquals(1, cnt);
+      assertEquals(2, cnt);
     }
   }
 }

Reply via email to