This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch pr_1758 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 70e2e0ea211be6cb944c7ffffc6de69143cac2f4 Author: 张凌哲 <[email protected]> AuthorDate: Mon Oct 12 12:54:44 2020 +0800 add thread pool limit --- .../resources/conf/iotdb-engine.properties | 4 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 +++- .../engine/storagegroup/StorageGroupProcessor.java | 124 ++++++++++++--------- .../HotCompactionMergeTaskPoolManager.java | 9 +- .../iotdb/db/integration/IoTDBMergeTest.java | 2 +- 5 files changed, 100 insertions(+), 61 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 819e561..7810559 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -324,6 +324,10 @@ force_full_merge=false # When less than 0, this mechanism is disabled. chunk_merge_point_threshold=20480 +# How many thread will be set up to perform hot compaction, 30 by default. +# Set to 1 when less than or equal to 0. +hot_compaction_thread_num=30 + # The limit of write throughput merge can reach per second merge_write_throughput_mb_per_sec=16 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c6163a6..d4c5dac 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -541,11 +541,17 @@ public class IoTDBConfig { private int chunkMergePointThreshold = 20480; /** - * The limit of write throughput merge can reach per second + * The limit of hot compaction merge can reach per second */ private int mergeWriteThroughputMbPerSec = 16; /** + * How many thread will be set up to perform hot compaction, 30 by default. Set to 1 when less + * than or equal to 0. + */ + private int hotCompactionThreadNum = 30; + + /** * The limit of read throughput merge can reach per second */ private int mergeReadThroughputMbPerSec = 16; @@ -889,11 +895,11 @@ public class IoTDBConfig { } public boolean isEnableDiscardOutOfOrderData() { - return enableDiscardOutOfOrderData ; + return enableDiscardOutOfOrderData; } - public void setEnableDiscardOutOfOrderData(boolean enableDiscardOutOfOrderData ) { - this.enableDiscardOutOfOrderData = enableDiscardOutOfOrderData ; + public void setEnableDiscardOutOfOrderData(boolean enableDiscardOutOfOrderData) { + this.enableDiscardOutOfOrderData = enableDiscardOutOfOrderData; } public int getFlushWalThreshold() { @@ -1264,6 +1270,14 @@ public class IoTDBConfig { this.chunkMergePointThreshold = chunkMergePointThreshold; } + public int getHotCompactionThreadNum() { + return hotCompactionThreadNum; + } + + public void setHotCompactionThreadNum(int hotCompactionThreadNum) { + this.hotCompactionThreadNum = hotCompactionThreadNum; + } + public int getMergeWriteThroughputMbPerSec() { return mergeWriteThroughputMbPerSec; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 5038d00..f613d42 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; @@ -145,11 +146,6 @@ public class StorageGroupProcessor { */ private final Object closeStorageGroupCondition = new Object(); /** - * hotCompactionMergeWorking is used to wait for last hot compaction to be done. - */ - private volatile boolean hotCompactionMergeWorking = false; - - /** * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a query is executed. */ private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock(); @@ -161,7 +157,10 @@ public class StorageGroupProcessor { * time partition id in the storage group -> tsFileProcessor for this time partition */ private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>(); - + /** + * hotCompactionMergeWorking is used to wait for last hot compaction to be done. + */ + private volatile boolean hotCompactionMergeWorking = false; // upgrading sequence TsFile resource list private List<TsFileResource> upgradeSeqFileList = new LinkedList<>(); @@ -315,7 +314,8 @@ public class StorageGroupProcessor { } RecoverMergeTask recoverMergeTask = new RecoverMergeTask( new ArrayList<>(tsFileManagement.getTsFileList(true)), - tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(), tsFileManagement::mergeEndAction, + tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(), + tsFileManagement::mergeEndAction, taskName, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName); logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName); @@ -606,14 +606,22 @@ public class StorageGroupProcessor { // init map long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime()); - latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); partitionLatestFlushedTimeForEachDevice .computeIfAbsent(timePartitionId, id -> new HashMap<>()); - // insert to sequence or unSequence file - insertToTsFileProcessor(insertRowPlan, + boolean isSequence = insertRowPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId) - .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)); + .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE); + + //is unsequence and user set config to discard out of order data + if (!isSequence && IoTDBDescriptor.getInstance().getConfig() + .isEnableDiscardOutOfOrderData()) { + return; + } + + latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); + // insert to sequence or unSequence file + insertToTsFileProcessor(insertRowPlan, isSequence); } finally { writeUnlock(); @@ -670,9 +678,12 @@ public class StorageGroupProcessor { // start next partition if (curTimePartition != beforeTimePartition) { // insert last time partition - noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, - results, - beforeTimePartition) && noFailure; + if (isSequence || !IoTDBDescriptor.getInstance().getConfig() + .isEnableDiscardOutOfOrderData()) { + noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, + results, + beforeTimePartition) && noFailure; + } // re initialize before = loc; beforeTimePartition = curTimePartition; @@ -686,8 +697,11 @@ public class StorageGroupProcessor { // judge if we should insert sequence if (!isSequence && time > lastFlushTime) { // insert into unsequence and then start sequence - noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results, - beforeTimePartition) && noFailure; + if (!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) { + noFailure = + insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results, + beforeTimePartition) && noFailure; + } before = loc; isSequence = true; } @@ -696,7 +710,8 @@ public class StorageGroupProcessor { } // do not forget last part - if (before < loc) { + if (before < loc && (isSequence || !IoTDBDescriptor.getInstance().getConfig() + .isEnableDiscardOutOfOrderData())) { noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, results, beforeTimePartition) && noFailure; } @@ -771,6 +786,9 @@ public class StorageGroupProcessor { } private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime) { + if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { + return; + } MeasurementMNode[] mNodes = plan.getMeasurementMNodes(); for (int i = 0; i < mNodes.length; i++) { if (plan.getColumns()[i] == null) { @@ -823,6 +841,9 @@ public class StorageGroupProcessor { } private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) { + if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { + return; + } MeasurementMNode[] mNodes = plan.getMeasurementMNodes(); for (int i = 0; i < mNodes.length; i++) { if (plan.getValues()[i] == null) { @@ -1439,12 +1460,11 @@ public class StorageGroupProcessor { private void tryToDeleteLastCache(PartialPath deviceId, String measurementId, long startTime, long endTime) throws WriteProcessException { - MNode node = null; try { MManager manager = MManager.getInstance(); - node = manager.getDeviceNodeWithAutoCreateAndReadLock(deviceId); + MNode node = manager.getDeviceNodeWithAutoCreate(deviceId); - MNode measurementNode = manager.getChild(node, measurementId); + MNode measurementNode = node.getChild(measurementId); if (measurementNode != null) { TimeValuePair lastPair = ((MeasurementMNode) measurementNode).getCachedLast(); if (lastPair != null && startTime <= lastPair.getTimestamp() @@ -1454,10 +1474,6 @@ public class StorageGroupProcessor { } } catch (MetadataException e) { throw new WriteProcessException(e); - } finally { - if (node != null) { - node.readUnlock(); - } } } @@ -1545,7 +1561,7 @@ public class StorageGroupProcessor { .submitTask( tsFileManagement.new HotCompactionMergeTask(this::closeHotCompactionMergeCallBack, tsFileProcessor.getTimeRangeId())); - } catch (IOException e) { + } catch (IOException | RejectedExecutionException e) { this.closeHotCompactionMergeCallBack(); logger.error("{} hot compaction submit task failed", storageGroupName); } @@ -1640,7 +1656,7 @@ public class StorageGroupProcessor { writeLock(); try { this.tsFileManagement.merge(fullMerge, tsFileManagement.getTsFileList(true), - tsFileManagement.getTsFileList(false),dataTTL); + tsFileManagement.getTsFileList(false), dataTTL); } finally { writeUnlock(); } @@ -2260,34 +2276,6 @@ public class StorageGroupProcessor { return partitionFileVersions.containsAll(tsFileResource.getHistoricalVersions()); } - private enum LoadTsFileType { - LOAD_SEQUENCE, LOAD_UNSEQUENCE - } - - @FunctionalInterface - public interface CloseTsFileCallBack { - - void call(TsFileProcessor caller) throws TsFileProcessorException, IOException; - } - - @FunctionalInterface - public interface UpdateEndTimeCallBack { - - boolean call(TsFileProcessor caller); - } - - @FunctionalInterface - public interface UpgradeTsFileResourceCallBack { - - void call(TsFileResource caller); - } - - @FunctionalInterface - public interface CloseHotCompactionMergeCallBack { - - void call(); - } - /** * remove all partitions that satisfy a filter. */ @@ -2338,6 +2326,34 @@ public class StorageGroupProcessor { } } + private enum LoadTsFileType { + LOAD_SEQUENCE, LOAD_UNSEQUENCE + } + + @FunctionalInterface + public interface CloseTsFileCallBack { + + void call(TsFileProcessor caller) throws TsFileProcessorException, IOException; + } + + @FunctionalInterface + public interface UpdateEndTimeCallBack { + + boolean call(TsFileProcessor caller); + } + + @FunctionalInterface + public interface UpgradeTsFileResourceCallBack { + + void call(TsFileResource caller); + } + + @FunctionalInterface + public interface CloseHotCompactionMergeCallBack { + + void call(); + } + @FunctionalInterface public interface TimePartitionFilter { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java index 2697f5d..f910c4b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java @@ -20,9 +20,11 @@ package org.apache.iotdb.db.engine.tsfilemanagement; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement.HotCompactionMergeTask; import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.ServiceType; @@ -48,7 +50,9 @@ public class HotCompactionMergeTaskPoolManager implements IService { public void start() { if (pool == null) { this.pool = IoTDBThreadPoolFactory - .newCachedThreadPool(ThreadName.HOT_COMPACTION_SERVICE.getName()); + .newScheduledThreadPool( + IoTDBDescriptor.getInstance().getConfig().getHotCompactionThreadNum(), + ThreadName.HOT_COMPACTION_SERVICE.getName()); } logger.info("Hot compaction merge task manager started."); } @@ -100,7 +104,8 @@ public class HotCompactionMergeTaskPoolManager implements IService { return ServiceType.HOT_COMPACTION_SERVICE; } - public void submitTask(HotCompactionMergeTask hotCompactionMergeTask) { + public void submitTask(HotCompactionMergeTask hotCompactionMergeTask) + throws RejectedExecutionException { if (pool != null && !pool.isTerminated()) { pool.submit(hotCompactionMergeTask); } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java index f65130d..c4df986 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java @@ -294,7 +294,7 @@ public class IoTDBMergeTest { } // it is uncertain whether the sub tasks are created at this time point, and we are only // sure that the main task is created - assertEquals(1, cnt); + assertEquals(2, cnt); } } }
