This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch virtual_partition in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 52e00d7339201295d95f503aabe4bacd63eb41cd Merge: 2deea20 12fe408 Author: 151250176 <[email protected]> AuthorDate: Mon Dec 7 15:52:52 2020 +0800 Merge branch 'master' of https://github.com/apache/iotdb into virtual_partition # Conflicts: # server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java example/flink/pom.xml | 25 ++ .../org/apache/iotdb/flink/FlinkIoTDBSink.java | 17 +- hive-connector/pom.xml | 4 + pom.xml | 1 + .../resources/conf/iotdb-engine.properties | 7 + .../org/apache/iotdb/db/engine/StorageEngine.java | 37 +- .../engine/compaction/utils/CompactionUtils.java | 21 +- .../engine/storagegroup/StorageGroupProcessor.java | 9 +- .../db/engine/storagegroup/TsFileProcessor.java | 16 +- .../db/exception/WriteProcessRejectException.java | 22 +- .../iotdb/db/query/reader/chunk/MemPageReader.java | 2 +- .../iotdb/db/query/reader/series/SeriesReader.java | 4 +- .../org/apache/iotdb/db/rescon/SystemInfo.java | 37 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 3 +- .../iotdb/db/integration/IOTDBGroupByIT.java | 5 + .../iotdb/db/integration/IoTDBAggregationIT.java | 5 + .../integration/IoTDBAggregationLargeDataIT.java | 5 + .../integration/IoTDBAggregationSmallDataIT.java | 7 +- .../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 7 +- .../iotdb/db/integration/IoTDBCompactionIT.java | 55 +++ .../iotdb/db/integration/IoTDBLargeDataIT.java | 5 + .../IoTDBMultiOverlappedChunkInUnseqIT.java | 5 + .../db/integration/IoTDBMultiOverlappedPageIT.java | 5 + .../iotdb/db/integration/IoTDBMultiSeriesIT.java | 5 + .../db/integration/IoTDBMultiStatementsIT.java | 5 + .../db/integration/IoTDBOverlappedPageIT.java | 5 + .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../apache/iotdb/tsfile/read/common/BatchData.java | 35 +- .../iotdb/tsfile/read/common/BatchDataFactory.java | 7 +- .../{DescBatchData.java => DescReadBatchData.java} | 14 +- .../tsfile/read/common/DescReadWriteBatchData.java | 374 +++++++++++++++++++++ .../iotdb/tsfile/read/reader/page/PageReader.java | 2 +- 32 files changed, 656 insertions(+), 96 deletions(-) diff --cc server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 2a44afa,00d5db1..b57f564 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@@ -155,88 -177,7 +156,70 @@@ public class StorageEngine implements I recover(); } + public static StorageEngine getInstance() { + return InstanceHolder.INSTANCE; + } + + private static void initTimePartition() { + timePartitionInterval = convertMilliWithPrecision(IoTDBDescriptor.getInstance(). + getConfig().getPartitionInterval() * 1000L); + } + + public static long convertMilliWithPrecision(long milliTime) { + long result = milliTime; + String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision(); + switch (timePrecision) { + case "ns": + result = milliTime * 1000_000L; + break; + case "us": + result = milliTime * 1000L; + break; + default: + break; + } + return result; + } + + public static long getTimePartitionInterval() { + if (timePartitionInterval == -1) { + initTimePartition(); + } + return timePartitionInterval; + } + + @TestOnly + public static void setTimePartitionInterval(long timePartitionInterval) { + StorageEngine.timePartitionInterval = timePartitionInterval; + } + + public static long getTimePartition(long time) { + return enablePartition ? time / timePartitionInterval : 0; + } + + @TestOnly + public static boolean isEnablePartition() { + return enablePartition; + } + + @TestOnly + public static void setEnablePartition(boolean enablePartition) { + StorageEngine.enablePartition = enablePartition; + } + - /** - * block insertion if the insertion is rejected by memory control - */ - public static void blockInsertionIfReject() throws WriteProcessException { - long startTime = System.currentTimeMillis(); - while (SystemInfo.getInstance().isRejected()) { - try { - TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked()); - if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) { - throw new WriteProcessException( - "System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() + - "ms"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } + + public boolean isAllSgReady() { + return isAllSgReady.get(); + } + + public void setAllSgReady(boolean allSgReady) { + isAllSgReady.set(allSgReady); + } + public void recover() { + setAllSgReady(false); + recoveryThreadPool = IoTDBThreadPoolFactory + .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool"); recoverAllSgThreadPool = IoTDBThreadPoolFactory .newSingleThreadExecutor("Begin-Recovery-Pool"); recoverAllSgThreadPool.submit(this::recoverAllSgs); @@@ -949,12 -841,21 +932,30 @@@ list.forEach(storageGroupProcessor -> storageGroupProcessor.getTsFileManagement().readUnLock()); } + static class InstanceHolder { + + private static final StorageEngine INSTANCE = new StorageEngine(); + + private InstanceHolder() { + // forbidding instantiation + } + } ++ + /** + * block insertion if the insertion is rejected by memory control + */ + public static void blockInsertionIfReject() throws WriteProcessRejectException { + long startTime = System.currentTimeMillis(); + while (SystemInfo.getInstance().isRejected()) { + try { + TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked()); + if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) { + throw new WriteProcessRejectException("System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() + + "ms"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } }
