This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch virtual_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 52e00d7339201295d95f503aabe4bacd63eb41cd
Merge: 2deea20 12fe408
Author: 151250176 <[email protected]>
AuthorDate: Mon Dec 7 15:52:52 2020 +0800

    Merge branch 'master' of https://github.com/apache/iotdb into 
virtual_partition
    
    # Conflicts:
    #   server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java

 example/flink/pom.xml                              |  25 ++
 .../org/apache/iotdb/flink/FlinkIoTDBSink.java     |  17 +-
 hive-connector/pom.xml                             |   4 +
 pom.xml                                            |   1 +
 .../resources/conf/iotdb-engine.properties         |   7 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  37 +-
 .../engine/compaction/utils/CompactionUtils.java   |  21 +-
 .../engine/storagegroup/StorageGroupProcessor.java |   9 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  16 +-
 .../db/exception/WriteProcessRejectException.java  |  22 +-
 .../iotdb/db/query/reader/chunk/MemPageReader.java |   2 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |   4 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |  37 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   3 +-
 .../iotdb/db/integration/IOTDBGroupByIT.java       |   5 +
 .../iotdb/db/integration/IoTDBAggregationIT.java   |   5 +
 .../integration/IoTDBAggregationLargeDataIT.java   |   5 +
 .../integration/IoTDBAggregationSmallDataIT.java   |   7 +-
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java |   7 +-
 .../iotdb/db/integration/IoTDBCompactionIT.java    |  55 +++
 .../iotdb/db/integration/IoTDBLargeDataIT.java     |   5 +
 .../IoTDBMultiOverlappedChunkInUnseqIT.java        |   5 +
 .../db/integration/IoTDBMultiOverlappedPageIT.java |   5 +
 .../iotdb/db/integration/IoTDBMultiSeriesIT.java   |   5 +
 .../db/integration/IoTDBMultiStatementsIT.java     |   5 +
 .../db/integration/IoTDBOverlappedPageIT.java      |   5 +
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../apache/iotdb/tsfile/read/common/BatchData.java |  35 +-
 .../iotdb/tsfile/read/common/BatchDataFactory.java |   7 +-
 .../{DescBatchData.java => DescReadBatchData.java} |  14 +-
 .../tsfile/read/common/DescReadWriteBatchData.java | 374 +++++++++++++++++++++
 .../iotdb/tsfile/read/reader/page/PageReader.java  |   2 +-
 32 files changed, 656 insertions(+), 96 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 2a44afa,00d5db1..b57f564
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@@ -155,88 -177,7 +156,70 @@@ public class StorageEngine implements I
      recover();
    }
  
 +  public static StorageEngine getInstance() {
 +    return InstanceHolder.INSTANCE;
 +  }
 +
 +  private static void initTimePartition() {
 +    timePartitionInterval = 
convertMilliWithPrecision(IoTDBDescriptor.getInstance().
 +        getConfig().getPartitionInterval() * 1000L);
 +  }
 +
 +  public static long convertMilliWithPrecision(long milliTime) {
 +    long result = milliTime;
 +    String timePrecision = 
IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
 +    switch (timePrecision) {
 +      case "ns":
 +        result = milliTime * 1000_000L;
 +        break;
 +      case "us":
 +        result = milliTime * 1000L;
 +        break;
 +      default:
 +        break;
 +    }
 +    return result;
 +  }
 +
 +  public static long getTimePartitionInterval() {
 +    if (timePartitionInterval == -1) {
 +      initTimePartition();
 +    }
 +    return timePartitionInterval;
 +  }
 +
 +  @TestOnly
 +  public static void setTimePartitionInterval(long timePartitionInterval) {
 +    StorageEngine.timePartitionInterval = timePartitionInterval;
 +  }
 +
 +  public static long getTimePartition(long time) {
 +    return enablePartition ? time / timePartitionInterval : 0;
 +  }
 +
 +  @TestOnly
 +  public static boolean isEnablePartition() {
 +    return enablePartition;
 +  }
 +
 +  @TestOnly
 +  public static void setEnablePartition(boolean enablePartition) {
 +    StorageEngine.enablePartition = enablePartition;
 +  }
 +
-   /**
-    * block insertion if the insertion is rejected by memory control
-    */
-   public static void blockInsertionIfReject() throws WriteProcessException {
-     long startTime = System.currentTimeMillis();
-     while (SystemInfo.getInstance().isRejected()) {
-       try {
-         TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
-         if (System.currentTimeMillis() - startTime > 
config.getMaxWaitingTimeWhenInsertBlocked()) {
-           throw new WriteProcessException(
-               "System rejected over " + 
config.getMaxWaitingTimeWhenInsertBlocked() +
-                   "ms");
-         }
-       } catch (InterruptedException e) {
-         Thread.currentThread().interrupt();
-       }
-     }
-   }
 +
 +  public boolean isAllSgReady() {
 +    return isAllSgReady.get();
 +  }
 +
 +  public void setAllSgReady(boolean allSgReady) {
 +    isAllSgReady.set(allSgReady);
 +  }
 +
    public void recover() {
 +    setAllSgReady(false);
 +    recoveryThreadPool = IoTDBThreadPoolFactory
 +        .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), 
"Recovery-Thread-Pool");
      recoverAllSgThreadPool = IoTDBThreadPoolFactory
          .newSingleThreadExecutor("Begin-Recovery-Pool");
      recoverAllSgThreadPool.submit(this::recoverAllSgs);
@@@ -949,12 -841,21 +932,30 @@@
      list.forEach(storageGroupProcessor -> 
storageGroupProcessor.getTsFileManagement().readUnLock());
    }
  
 +  static class InstanceHolder {
 +
 +    private static final StorageEngine INSTANCE = new StorageEngine();
 +
 +    private InstanceHolder() {
 +      // forbidding instantiation
 +    }
 +  }
++
+   /**
+    * block insertion if the insertion is rejected by memory control
+    */
+   public static void blockInsertionIfReject() throws 
WriteProcessRejectException {
+     long startTime = System.currentTimeMillis();
+     while (SystemInfo.getInstance().isRejected()) {
+       try {
+         TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+         if (System.currentTimeMillis() - startTime > 
config.getMaxWaitingTimeWhenInsertBlocked()) {
+           throw new WriteProcessRejectException("System rejected over " + 
config.getMaxWaitingTimeWhenInsertBlocked() +
+               "ms");
+         }
+       } catch (InterruptedException e) {
+         Thread.currentThread().interrupt();
+       }
+     }
+   }
  }

Reply via email to