This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch proceeding_vldb
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/proceeding_vldb by this push:
     new 624a1d6  add load compaction for level
624a1d6 is described below

commit 624a1d6c1150e523ac323bba3cbabe76e6bb938a
Author: EJTTianyu <[email protected]>
AuthorDate: Sat Feb 20 22:03:09 2021 +0800

    add load compaction for level
---
 .../apache/iotdb/tsfile/TsFileSequenceRead.java    |  16 +++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   4 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  12 +++
 .../db/engine/compaction/TsFileManagement.java     |   6 ++
 .../level/LevelCompactionTsFileManagement.java     |  25 +++--
 .../engine/storagegroup/StorageGroupProcessor.java |  60 ++++++++++++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 104 ++++++++++++++++++++-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |  11 +++
 .../iotdb/db/tools/TsFileSinglePathRead.java       |  87 +++++++++++++++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |   8 ++
 10 files changed, 318 insertions(+), 15 deletions(-)

diff --git 
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java 
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index da5bae6..3cb4c27 100644
--- 
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ 
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -20,8 +20,10 @@ package org.apache.iotdb.tsfile;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
@@ -45,6 +47,7 @@ public class TsFileSequenceRead {
     if (args.length >= 1) {
       filename = args[0];
     }
+    int count = 0;
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
       System.out.println("file length: " + 
FSFactoryProducer.getFSFactory().getFile(filename).length());
       System.out.println("file magic head: " + reader.readHeadMagic());
@@ -60,6 +63,7 @@ public class TsFileSequenceRead {
       System.out.println("[Chunk Group]");
       System.out.println("position: " + reader.position());
       byte marker;
+      Set<Long> pagePointNum = new HashSet<>();
       while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
           case MetaMarker.CHUNK_HEADER:
@@ -77,7 +81,9 @@ public class TsFileSequenceRead {
               System.out.println("\t\t[Page]\n \t\tPage head position: " + 
reader.position());
               PageHeader pageHeader = 
reader.readPageHeader(header.getDataType());
               System.out.println("\t\tPage data position: " + 
reader.position());
-              System.out.println("\t\tpoints in the page: " + 
pageHeader.getNumOfValues());
+              long pointNum = pageHeader.getNumOfValues();
+              System.out.println("\t\tpoints in the page: " + pointNum);
+              pagePointNum.add(pointNum);
               ByteBuffer pageData = reader.readPage(pageHeader, 
header.getCompressionType());
               System.out
                       .println("\t\tUncompressed page data size: " + 
pageHeader.getUncompressedSize());
@@ -85,10 +91,11 @@ public class TsFileSequenceRead {
                       defaultTimeDecoder, null);
               BatchData batchData = reader1.getAllSatisfiedPageData();
               while (batchData.hasCurrent()) {
-                System.out.println(
-                        "\t\t\ttime, value: " + batchData.currentTime() + ", " 
+ batchData
-                                .currentValue());
+//                System.out.println(
+//                        "\t\t\ttime, value: " + batchData.currentTime() + ", 
" + batchData
+//                                .currentValue());
                 batchData.next();
+                count++;
               }
             }
             break;
@@ -117,6 +124,7 @@ public class TsFileSequenceRead {
           }
         }
       }
+      System.out.println("total points num:" + count);
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1a08b9a..c4be6f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -293,7 +293,7 @@ public class IoTDBConfig {
   /**
    * When a TsFile's file size (in byte) exceed this, the TsFile is forced 
closed.
    */
-  private long tsFileSizeThreshold = 1L;
+  private long tsFileSizeThreshold = 128 * 1024 * 1024L;
 
   /**
    * When a memTable's size (in byte) exceeds this, the memtable is flushed to 
disk.
@@ -414,7 +414,7 @@ public class IoTDBConfig {
   /**
    * whether to cache meta data(ChunkMetaData and TsFileMetaData) or not.
    */
-  private boolean metaDataCacheEnable = true;
+  private boolean metaDataCacheEnable = false;
 
   /**
    * Memory allocated for timeSeriesMetaData cache in read process
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 257cc42..69e28dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -674,6 +674,18 @@ public class StorageEngine implements IService {
     getProcessor(storageGroupPath).loadNewTsFile(newTsFileResource);
   }
 
+  public void loadNewTsFileBySeq(TsFileResource newTsFileResource, boolean 
isSeq)
+      throws LoadFileException, StorageEngineException, MetadataException {
+    Map<String, Integer> deviceMap = newTsFileResource.getDeviceToIndexMap();
+    if (deviceMap == null || deviceMap.isEmpty()) {
+      throw new StorageEngineException("Can not get the corresponding storage 
group.");
+    }
+    String device = deviceMap.keySet().iterator().next();
+    PartialPath devicePath = new PartialPath(device);
+    PartialPath storageGroupPath = 
IoTDB.metaManager.getStorageGroupPath(devicePath);
+    getProcessor(storageGroupPath).loadNewTsFile(newTsFileResource, isSeq);
+  }
+
   public boolean deleteTsfileForSync(File deletedTsfile)
       throws StorageEngineException, IllegalPathException {
     return getProcessor(new 
PartialPath(deletedTsfile.getParentFile().getName()))
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index ed84cae..c7ee4d3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompac
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -239,6 +240,11 @@ public abstract class TsFileManagement {
   protected abstract void mergeFiles(
       Map<Long, Map<Long, List<TsFileResource>>> resources, long 
timePartition);
 
+  protected void printInfo() {
+    logger.info("current compaction num: {}", 
SystemInfo.getInstance().getCompactionNum());
+    System.out.println("current compaction num: " + 
SystemInfo.getInstance().getCompactionNum());
+  }
+
   public class CompactionMergeTask implements Runnable {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index a4ac9b1..4999d13 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -41,7 +41,9 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -55,6 +57,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.MergeUtils;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -423,7 +426,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
   // 对乱序文件第一层做特殊处理
   private boolean handleSpecificCase(long timePartition) {
     if (forkedUnSequenceTsFileResources.get(0).size() > 
config.getFirstLevelFileNum() &&
-        forkedSequenceTsFileResources.get(0).size() == 0) {
+        isEmpty(true)) {
       writeLock();
       try {
         TsFileResource oldResource = 
forkedUnSequenceTsFileResources.get(0).get(0);
@@ -601,8 +604,9 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
           newResource.setHistoricalVersions(historicalVersions);
           newResource.serialize();
           newFileWriter.endFile();
+          newResource.close();
           fileNames.remove(0);
-          newTsFilePair = createNewFileWriter(MERGE_SUFFIX, seqPath, 
fileNames, maxLevel);
+          newTsFilePair = createNewFileWriter(MERGE_SUFFIX, seqPath, 
fileNames, maxLevel - 1);
           newFileWriter = newTsFilePair.left;
           newResource = newTsFilePair.right;
           newTsResources.add(newResource);
@@ -623,6 +627,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
           while (tsFilesReader.hasNextBatch()) {
             BatchData batchData = tsFilesReader.nextBatch();
             currMinTime = Math.min(currMinTime, batchData.getTimeByIndex(0));
+            
SystemInfo.getInstance().incrementCompactionNum(batchData.length());
             for (int i = 0; i < batchData.length(); i++) {
               writeBatchPoint(batchData, i, chunkWriter);
             }
@@ -643,7 +648,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
       newResource.setHistoricalVersions(historicalVersions);
       newResource.serialize();
       newFileWriter.endFile();
-
+      newResource.close();
       cleanUp(resources, newTsResources, maxLevel, timePartition);
     } catch (Exception e) {
       //TODO do nothing
@@ -663,6 +668,10 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
               .removeAll(res.getValue());
         }
         for (TsFileResource deleteRes : res.getValue()) {
+          ChunkCache.getInstance().clear();
+          ChunkMetadataCache.getInstance().clear();
+          TimeSeriesMetadataCache.getInstance().clear();
+          
FileReaderManager.getInstance().closeFileAndRemoveReader(deleteRes.getTsFilePath());
           deleteRes.delete();
         }
       }
@@ -705,16 +714,16 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
 
   @Override
   protected void merge(long timePartition) {
-    if (isCompactionWorking()) {
-      return;
-    }
+//    if (isCompactionWorking()) {
+//      return;
+//    }
     handleSpecificCase(timePartition);
     if (processUnseq()) {
       Map<Long, Map<Long, List<TsFileResource>>> selectFiles = 
selectMergeFile(timePartition);
       mergeFiles(selectFiles, timePartition);
-    } else {
-      processSeq(timePartition);
     }
+    processSeq(timePartition);
+    printInfo();
   }
 
   @SuppressWarnings("squid:S3776")
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 57a3d13..7d3519e 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1935,6 +1935,66 @@ public class StorageGroupProcessor {
     }
   }
 
+  public void loadNewTsFile(TsFileResource newTsFileResource, boolean isSeq) 
throws LoadFileException {
+    File tsfileToBeInserted = newTsFileResource.getTsFile();
+    long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
+    while (compactionMergeWorking) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+    writeLock();
+    tsFileManagement.writeLock();
+    try {
+      // loading tsfile by type
+      if (!isSeq) {
+        loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, 
newTsFileResource,
+            newFilePartitionId);
+      } else {
+        loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, 
newTsFileResource,
+            newFilePartitionId);
+      }
+
+      // update latest time map
+      updateLatestTimeMap(newTsFileResource);
+      long partitionNum = newTsFileResource.getTimePartition();
+      partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>())
+          .addAll(newTsFileResource.getHistoricalVersions());
+      updatePartitionFileVersion(partitionNum,
+          Collections.max(newTsFileResource.getHistoricalVersions()));
+    } catch (DiskSpaceInsufficientException e) {
+      logger.error(
+          "Failed to append the tsfile {} to storage group processor {} 
because the disk space is insufficient.",
+          tsfileToBeInserted.getAbsolutePath(), 
tsfileToBeInserted.getParentFile().getName());
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+      throw new LoadFileException(e);
+    } finally {
+      tsFileManagement.writeUnlock();
+      writeUnlock();
+    }
+    if (!compactionMergeWorking && 
!CompactionMergeTaskPoolManager.getInstance()
+        .isTerminated()) {
+      compactionMergeWorking = true;
+      logger.info("{} submit a compaction merge task", storageGroupName);
+      try {
+        // fork and filter current tsfile, then commit then to compaction merge
+        tsFileManagement.forkCurrentFileList(newFilePartitionId);
+        CompactionMergeTaskPoolManager.getInstance()
+            .submitTask(
+                tsFileManagement.new 
CompactionMergeTask(this::closeCompactionMergeCallBack,
+                    newFilePartitionId));
+      } catch (IOException | RejectedExecutionException e) {
+        this.closeCompactionMergeCallBack();
+        logger.error("{} compaction submit task failed", storageGroupName);
+      }
+    } else {
+      logger.info("{} last compaction merge task is working, skip current 
merge",
+          storageGroupName);
+    }
+  }
+
   /**
    * Set the version in "partition" to "version" if "version" is larger than 
the current version.
    */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 6e02613..65fdbb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -41,6 +41,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -61,6 +62,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
@@ -690,12 +692,64 @@ public class PlanExecutor implements IPlanExecutor {
           String.format("File path %s doesn't exists.", file.getPath()));
     }
     if (file.isDirectory()) {
-      recursionFileDir(file, plan);
+//      recursionFileDir(file, plan);
+
+//      为了 vldb 投稿, 根据文件 version 加载数据文件
+//      数据文件夹格式 sg -- seq
+//                      -- unseq
+      loadVldbDataDir(file, plan);
     } else {
       loadFile(file, plan);
     }
   }
 
+  private void loadVldbDataDir(File curFile, OperateFilePlan plan) throws 
QueryProcessException{
+    File[] files = curFile.listFiles();
+    List<File> fileList = new ArrayList<>();
+    Map<Boolean, List<File>> fileBySeq = new HashMap<>();
+    fileBySeq.put(true, new ArrayList<>());
+    fileBySeq.put(false, new ArrayList<>());
+    // 处理顺序文件
+    for (File file : files[0].listFiles()) {
+      if (file.getName().endsWith(TSFILE_SUFFIX)) {
+        fileList.add(file);
+        fileBySeq.get(true).add(file);
+      }
+    }
+    for (File file : files[1].listFiles()) {
+      if (file.getName().endsWith(TSFILE_SUFFIX)) {
+        fileList.add(file);
+        fileBySeq.get(false).add(file);
+      }
+    }
+    File maxFile = null;
+    if (!fileBySeq.get(true).isEmpty()) {
+       maxFile = fileList.remove(fileList.size() - 1);
+    }
+    Collections.sort(fileList, new Comparator<File>() {
+      @Override
+      public int compare(File o1, File o2) {
+        return TsFileManagement.compareFileName(o1, o2);
+      }
+    });
+    if (!fileBySeq.get(true).isEmpty()) {
+      fileList.add(maxFile);
+    }
+    for (File file : fileList) {
+      if (fileBySeq.get(true).contains(file)) {
+        loadFileBySeq(file, true, plan);
+      } else {
+        loadFileBySeq(file, false, plan);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    new PlanExecutor().loadVldbDataDir(new File(
+            
"/Users/tianyu/2019秋季学期/iotdb/server/target/iotdb-server-0.11.2/data/data/sequence/root.group_1"),
+        null);
+  }
+
   private void recursionFileDir(File curFile, OperateFilePlan plan) throws 
QueryProcessException {
     File[] files = curFile.listFiles();
     for (File file : files) {
@@ -755,6 +809,54 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  private void loadFileBySeq(File file, boolean isSeq, OperateFilePlan plan) 
throws QueryProcessException {
+    if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+      return;
+    }
+    TsFileResource tsFileResource = new TsFileResource(file);
+    long fileVersion =
+        Long.parseLong(
+            
tsFileResource.getTsFile().getName().split(IoTDBConstant.FILE_NAME_SEPARATOR)[1]);
+    tsFileResource.setHistoricalVersions(Collections.singleton(fileVersion));
+    tsFileResource.setClosed(true);
+    try {
+      // check file
+      RestorableTsFileIOWriter restorableTsFileIOWriter = new 
RestorableTsFileIOWriter(file);
+      if (restorableTsFileIOWriter.hasCrashed()) {
+        restorableTsFileIOWriter.close();
+        throw new QueryProcessException(
+            String.format(
+                "Cannot load file %s because the file has crashed.", 
file.getAbsolutePath()));
+      }
+      Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
+
+      List<Pair<Long, Long>> versionInfo = new ArrayList<>();
+
+      List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
+      try (TsFileSequenceReader reader = new 
TsFileSequenceReader(file.getAbsolutePath(), false)) {
+        reader.selfCheck(schemaMap, chunkGroupMetadataList, versionInfo, 
false);
+      }
+
+      FileLoaderUtils.checkTsFileResource(tsFileResource);
+      if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+        throw new QueryProcessException(
+            String.format(
+                "Cannot load file %s because the file's version is old which 
needs to be upgraded.",
+                file.getAbsolutePath()));
+      }
+
+      // create schemas if they doesn't exist
+      if (plan.isAutoCreateSchema()) {
+        createSchemaAutomatically(chunkGroupMetadataList, schemaMap, 
plan.getSgLevel());
+      }
+
+      StorageEngine.getInstance().loadNewTsFileBySeq(tsFileResource, isSeq);
+    } catch (Exception e) {
+      throw new QueryProcessException(
+          String.format("Cannot load file %s because %s", 
file.getAbsolutePath(), e.getMessage()));
+    }
+  }
+
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   private void createSchemaAutomatically(
       List<ChunkGroupMetadata> chunkGroupMetadataList,
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index cbf76bd..7953882 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -43,6 +44,7 @@ public class SystemInfo {
 
   private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
 
+  private AtomicLong compactionNum = new AtomicLong(0);
   private static final double FLUSH_THERSHOLD =
       config.getAllocateMemoryForWrite() * config.getFlushProportion();
   private static final double REJECT_THERSHOLD = 
@@ -202,4 +204,13 @@ public class SystemInfo {
 
     private static SystemInfo instance = new SystemInfo();
   }
+
+  // 用于 vldb 投稿,改变 or 获取 compactionNum 的值
+  public long getCompactionNum() {
+    return compactionNum.get();
+  }
+
+  public void incrementCompactionNum(long batchSize) {
+    compactionNum.getAndAdd(batchSize);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSinglePathRead.java 
b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSinglePathRead.java
new file mode 100644
index 0000000..528c46d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSinglePathRead.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.query.reader.chunk.ChunkDataIterator;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class TsFileSinglePathRead {
+
+  public static void main(String[] args) throws IOException {
+//    Pair<String, String> params = checkArgs(args);
+//    for (File file : new 
File("/Users/tianyu/2019秋季学期/iotdb/server/target/iotdb-server-0.11.2/data/data/unsequence/root.group_9/unseq").listFiles())
 {
+//      if (file.getName().endsWith(".tsfile")) {
+//        Pair<String, String> params = new Pair<>(
+//            file.getAbsolutePath(), "root.group_9.d_69.s_299");
+//        System.out.println(file.getName() + ":" + 
printTsFileSinglePath(params));
+//      }
+//    }
+    Pair<String, String> params = new Pair<>(
+        
"/Users/tianyu/2019秋季学期/iotdb/data/data/sequence/root.group_9/0/1613635606223-1-0.tsfile",
+        "root.group_9.d_19.s_810");
+    System.out.println("-" + printTsFileSinglePath(params));
+  }
+
+  public static int printTsFileSinglePath(Pair<String, String> params) throws 
IOException{
+    String filename = params.left;
+    String path = params.right;
+    int numCnt = 0;
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+      if (!reader.isComplete()){
+        throw new RuntimeException("The analyzed data file is incomplete, 
process will stop");
+      }
+      // get the chunkMetaList of the specific path
+      List<ChunkMetadata> chunkMetadataList = reader
+          .getChunkMetadataList(new Path(path, true), false);
+      for (ChunkMetadata metadata : chunkMetadataList) {
+        System.out.println("|--[Chunk]");
+        ChunkReader chunkReader = new 
ChunkReader(reader.readMemChunk(metadata), null);
+        ChunkDataIterator chunkDataIterator = new 
ChunkDataIterator(chunkReader);
+        while (chunkDataIterator.hasNextTimeValuePair()) {
+          TimeValuePair pair = chunkDataIterator.nextTimeValuePair();
+          System.out.println(
+              "\t\t\ttime, value: " + pair.getTimestamp() + ", " + 
pair.getValue());
+          numCnt++;
+        }
+      }
+    }
+    return numCnt;
+  }
+
+  private static Pair<String, String> checkArgs(String[] args) {
+    String filename;
+    String path;
+    if (args.length >= 2) {
+      filename = args[0];
+      path = args[1];
+    } else {
+      throw new RuntimeException("Input args length less than two. Example 
usage: "
+          + "tools/print-tsfile-specific-measurement.sh file_path 
timeseries_path");
+    }
+    return new Pair<>(filename, path);
+  }
+}
\ No newline at end of file
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 6d79f9b..f7fed34 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -1053,6 +1053,14 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     return chunkMetadataList;
   }
 
+  public List<ChunkMetadata> getChunkMetadataList(Path path, boolean anyway) 
throws IOException {
+    TimeseriesMetadata timeseriesMetaData = readTimeseriesMetadata(path);
+    if (timeseriesMetaData == null) {
+      return Collections.emptyList();
+    }
+    return readChunkMetaDataList(timeseriesMetaData);
+  }
+
   /**
    * get ChunkMetaDatas in given TimeseriesMetaData
    *

Reply via email to