This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch proceeding_vldb
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/proceeding_vldb by this push:
new 624a1d6 add load compaction for level
624a1d6 is described below
commit 624a1d6c1150e523ac323bba3cbabe76e6bb938a
Author: EJTTianyu <[email protected]>
AuthorDate: Sat Feb 20 22:03:09 2021 +0800
add load compaction for level
---
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 16 +++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 12 +++
.../db/engine/compaction/TsFileManagement.java | 6 ++
.../level/LevelCompactionTsFileManagement.java | 25 +++--
.../engine/storagegroup/StorageGroupProcessor.java | 60 ++++++++++++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 104 ++++++++++++++++++++-
.../org/apache/iotdb/db/rescon/SystemInfo.java | 11 +++
.../iotdb/db/tools/TsFileSinglePathRead.java | 87 +++++++++++++++++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 8 ++
10 files changed, 318 insertions(+), 15 deletions(-)
diff --git
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index da5bae6..3cb4c27 100644
---
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -20,8 +20,10 @@ package org.apache.iotdb.tsfile;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
@@ -45,6 +47,7 @@ public class TsFileSequenceRead {
if (args.length >= 1) {
filename = args[0];
}
+ int count = 0;
try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
System.out.println("file length: " +
FSFactoryProducer.getFSFactory().getFile(filename).length());
System.out.println("file magic head: " + reader.readHeadMagic());
@@ -60,6 +63,7 @@ public class TsFileSequenceRead {
System.out.println("[Chunk Group]");
System.out.println("position: " + reader.position());
byte marker;
+ Set<Long> pagePointNum = new HashSet<>();
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
@@ -77,7 +81,9 @@ public class TsFileSequenceRead {
System.out.println("\t\t[Page]\n \t\tPage head position: " +
reader.position());
PageHeader pageHeader =
reader.readPageHeader(header.getDataType());
System.out.println("\t\tPage data position: " +
reader.position());
- System.out.println("\t\tpoints in the page: " +
pageHeader.getNumOfValues());
+ long pointNum = pageHeader.getNumOfValues();
+ System.out.println("\t\tpoints in the page: " + pointNum);
+ pagePointNum.add(pointNum);
ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
System.out
.println("\t\tUncompressed page data size: " +
pageHeader.getUncompressedSize());
@@ -85,10 +91,11 @@ public class TsFileSequenceRead {
defaultTimeDecoder, null);
BatchData batchData = reader1.getAllSatisfiedPageData();
while (batchData.hasCurrent()) {
- System.out.println(
- "\t\t\ttime, value: " + batchData.currentTime() + ", "
+ batchData
- .currentValue());
+// System.out.println(
+// "\t\t\ttime, value: " + batchData.currentTime() + ",
" + batchData
+// .currentValue());
batchData.next();
+ count++;
}
}
break;
@@ -117,6 +124,7 @@ public class TsFileSequenceRead {
}
}
}
+ System.out.println("total points num:" + count);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1a08b9a..c4be6f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -293,7 +293,7 @@ public class IoTDBConfig {
/**
* When a TsFile's file size (in byte) exceed this, the TsFile is forced
closed.
*/
- private long tsFileSizeThreshold = 1L;
+ private long tsFileSizeThreshold = 128 * 1024 * 1024L;
/**
* When a memTable's size (in byte) exceeds this, the memtable is flushed to
disk.
@@ -414,7 +414,7 @@ public class IoTDBConfig {
/**
* whether to cache meta data(ChunkMetaData and TsFileMetaData) or not.
*/
- private boolean metaDataCacheEnable = true;
+ private boolean metaDataCacheEnable = false;
/**
* Memory allocated for timeSeriesMetaData cache in read process
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 257cc42..69e28dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -674,6 +674,18 @@ public class StorageEngine implements IService {
getProcessor(storageGroupPath).loadNewTsFile(newTsFileResource);
}
+ public void loadNewTsFileBySeq(TsFileResource newTsFileResource, boolean
isSeq)
+ throws LoadFileException, StorageEngineException, MetadataException {
+ Map<String, Integer> deviceMap = newTsFileResource.getDeviceToIndexMap();
+ if (deviceMap == null || deviceMap.isEmpty()) {
+ throw new StorageEngineException("Can not get the corresponding storage
group.");
+ }
+ String device = deviceMap.keySet().iterator().next();
+ PartialPath devicePath = new PartialPath(device);
+ PartialPath storageGroupPath =
IoTDB.metaManager.getStorageGroupPath(devicePath);
+ getProcessor(storageGroupPath).loadNewTsFile(newTsFileResource, isSeq);
+ }
+
public boolean deleteTsfileForSync(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
return getProcessor(new
PartialPath(deletedTsfile.getParentFile().getName()))
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index ed84cae..c7ee4d3 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -57,6 +57,7 @@ import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompac
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -239,6 +240,11 @@ public abstract class TsFileManagement {
protected abstract void mergeFiles(
Map<Long, Map<Long, List<TsFileResource>>> resources, long
timePartition);
+ protected void printInfo() {
+ logger.info("current compaction num: {}",
SystemInfo.getInstance().getCompactionNum());
+ System.out.println("current compaction num: " +
SystemInfo.getInstance().getCompactionNum());
+ }
+
public class CompactionMergeTask implements Runnable {
private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index a4ac9b1..4999d13 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -41,7 +41,9 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -55,6 +57,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -423,7 +426,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
// 对乱序文件第一层做特殊处理
private boolean handleSpecificCase(long timePartition) {
if (forkedUnSequenceTsFileResources.get(0).size() >
config.getFirstLevelFileNum() &&
- forkedSequenceTsFileResources.get(0).size() == 0) {
+ isEmpty(true)) {
writeLock();
try {
TsFileResource oldResource =
forkedUnSequenceTsFileResources.get(0).get(0);
@@ -601,8 +604,9 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
newResource.setHistoricalVersions(historicalVersions);
newResource.serialize();
newFileWriter.endFile();
+ newResource.close();
fileNames.remove(0);
- newTsFilePair = createNewFileWriter(MERGE_SUFFIX, seqPath,
fileNames, maxLevel);
+ newTsFilePair = createNewFileWriter(MERGE_SUFFIX, seqPath,
fileNames, maxLevel - 1);
newFileWriter = newTsFilePair.left;
newResource = newTsFilePair.right;
newTsResources.add(newResource);
@@ -623,6 +627,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
currMinTime = Math.min(currMinTime, batchData.getTimeByIndex(0));
+
SystemInfo.getInstance().incrementCompactionNum(batchData.length());
for (int i = 0; i < batchData.length(); i++) {
writeBatchPoint(batchData, i, chunkWriter);
}
@@ -643,7 +648,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
newResource.setHistoricalVersions(historicalVersions);
newResource.serialize();
newFileWriter.endFile();
-
+ newResource.close();
cleanUp(resources, newTsResources, maxLevel, timePartition);
} catch (Exception e) {
//TODO do nothing
@@ -663,6 +668,10 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
.removeAll(res.getValue());
}
for (TsFileResource deleteRes : res.getValue()) {
+ ChunkCache.getInstance().clear();
+ ChunkMetadataCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+
FileReaderManager.getInstance().closeFileAndRemoveReader(deleteRes.getTsFilePath());
deleteRes.delete();
}
}
@@ -705,16 +714,16 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
@Override
protected void merge(long timePartition) {
- if (isCompactionWorking()) {
- return;
- }
+// if (isCompactionWorking()) {
+// return;
+// }
handleSpecificCase(timePartition);
if (processUnseq()) {
Map<Long, Map<Long, List<TsFileResource>>> selectFiles =
selectMergeFile(timePartition);
mergeFiles(selectFiles, timePartition);
- } else {
- processSeq(timePartition);
}
+ processSeq(timePartition);
+ printInfo();
}
@SuppressWarnings("squid:S3776")
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 57a3d13..7d3519e 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1935,6 +1935,66 @@ public class StorageGroupProcessor {
}
}
+ public void loadNewTsFile(TsFileResource newTsFileResource, boolean isSeq)
throws LoadFileException {
+ File tsfileToBeInserted = newTsFileResource.getTsFile();
+ long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
+ while (compactionMergeWorking) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ writeLock();
+ tsFileManagement.writeLock();
+ try {
+ // loading tsfile by type
+ if (!isSeq) {
+ loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted,
newTsFileResource,
+ newFilePartitionId);
+ } else {
+ loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted,
newTsFileResource,
+ newFilePartitionId);
+ }
+
+ // update latest time map
+ updateLatestTimeMap(newTsFileResource);
+ long partitionNum = newTsFileResource.getTimePartition();
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>())
+ .addAll(newTsFileResource.getHistoricalVersions());
+ updatePartitionFileVersion(partitionNum,
+ Collections.max(newTsFileResource.getHistoricalVersions()));
+ } catch (DiskSpaceInsufficientException e) {
+ logger.error(
+ "Failed to append the tsfile {} to storage group processor {}
because the disk space is insufficient.",
+ tsfileToBeInserted.getAbsolutePath(),
tsfileToBeInserted.getParentFile().getName());
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+ throw new LoadFileException(e);
+ } finally {
+ tsFileManagement.writeUnlock();
+ writeUnlock();
+ }
+ if (!compactionMergeWorking &&
!CompactionMergeTaskPoolManager.getInstance()
+ .isTerminated()) {
+ compactionMergeWorking = true;
+ logger.info("{} submit a compaction merge task", storageGroupName);
+ try {
+ // fork and filter current tsfile, then commit then to compaction merge
+ tsFileManagement.forkCurrentFileList(newFilePartitionId);
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(
+ tsFileManagement.new
CompactionMergeTask(this::closeCompactionMergeCallBack,
+ newFilePartitionId));
+ } catch (IOException | RejectedExecutionException e) {
+ this.closeCompactionMergeCallBack();
+ logger.error("{} compaction submit task failed", storageGroupName);
+ }
+ } else {
+ logger.info("{} last compaction merge task is working, skip current
merge",
+ storageGroupName);
+ }
+ }
+
/**
* Set the version in "partition" to "version" if "version" is larger than
the current version.
*/
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 6e02613..65fdbb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -41,6 +41,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -61,6 +62,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
@@ -690,12 +692,64 @@ public class PlanExecutor implements IPlanExecutor {
String.format("File path %s doesn't exists.", file.getPath()));
}
if (file.isDirectory()) {
- recursionFileDir(file, plan);
+// recursionFileDir(file, plan);
+
+// 为了 vldb 投稿, 根据文件 version 加载数据文件
+// 数据文件夹格式 sg -- seq
+// -- unseq
+ loadVldbDataDir(file, plan);
} else {
loadFile(file, plan);
}
}
+ private void loadVldbDataDir(File curFile, OperateFilePlan plan) throws
QueryProcessException{
+ File[] files = curFile.listFiles();
+ List<File> fileList = new ArrayList<>();
+ Map<Boolean, List<File>> fileBySeq = new HashMap<>();
+ fileBySeq.put(true, new ArrayList<>());
+ fileBySeq.put(false, new ArrayList<>());
+ // 处理顺序文件
+ for (File file : files[0].listFiles()) {
+ if (file.getName().endsWith(TSFILE_SUFFIX)) {
+ fileList.add(file);
+ fileBySeq.get(true).add(file);
+ }
+ }
+ for (File file : files[1].listFiles()) {
+ if (file.getName().endsWith(TSFILE_SUFFIX)) {
+ fileList.add(file);
+ fileBySeq.get(false).add(file);
+ }
+ }
+ File maxFile = null;
+ if (!fileBySeq.get(true).isEmpty()) {
+ maxFile = fileList.remove(fileList.size() - 1);
+ }
+ Collections.sort(fileList, new Comparator<File>() {
+ @Override
+ public int compare(File o1, File o2) {
+ return TsFileManagement.compareFileName(o1, o2);
+ }
+ });
+ if (!fileBySeq.get(true).isEmpty()) {
+ fileList.add(maxFile);
+ }
+ for (File file : fileList) {
+ if (fileBySeq.get(true).contains(file)) {
+ loadFileBySeq(file, true, plan);
+ } else {
+ loadFileBySeq(file, false, plan);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new PlanExecutor().loadVldbDataDir(new File(
+
"/Users/tianyu/2019秋季学期/iotdb/server/target/iotdb-server-0.11.2/data/data/sequence/root.group_1"),
+ null);
+ }
+
private void recursionFileDir(File curFile, OperateFilePlan plan) throws
QueryProcessException {
File[] files = curFile.listFiles();
for (File file : files) {
@@ -755,6 +809,54 @@ public class PlanExecutor implements IPlanExecutor {
}
}
+ private void loadFileBySeq(File file, boolean isSeq, OperateFilePlan plan)
throws QueryProcessException {
+ if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+ return;
+ }
+ TsFileResource tsFileResource = new TsFileResource(file);
+ long fileVersion =
+ Long.parseLong(
+
tsFileResource.getTsFile().getName().split(IoTDBConstant.FILE_NAME_SEPARATOR)[1]);
+ tsFileResource.setHistoricalVersions(Collections.singleton(fileVersion));
+ tsFileResource.setClosed(true);
+ try {
+ // check file
+ RestorableTsFileIOWriter restorableTsFileIOWriter = new
RestorableTsFileIOWriter(file);
+ if (restorableTsFileIOWriter.hasCrashed()) {
+ restorableTsFileIOWriter.close();
+ throw new QueryProcessException(
+ String.format(
+ "Cannot load file %s because the file has crashed.",
file.getAbsolutePath()));
+ }
+ Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
+
+ List<Pair<Long, Long>> versionInfo = new ArrayList<>();
+
+ List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(file.getAbsolutePath(), false)) {
+ reader.selfCheck(schemaMap, chunkGroupMetadataList, versionInfo,
false);
+ }
+
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
+ if (UpgradeUtils.isNeedUpgrade(tsFileResource)) {
+ throw new QueryProcessException(
+ String.format(
+ "Cannot load file %s because the file's version is old which
needs to be upgraded.",
+ file.getAbsolutePath()));
+ }
+
+ // create schemas if they doesn't exist
+ if (plan.isAutoCreateSchema()) {
+ createSchemaAutomatically(chunkGroupMetadataList, schemaMap,
plan.getSgLevel());
+ }
+
+ StorageEngine.getInstance().loadNewTsFileBySeq(tsFileResource, isSeq);
+ } catch (Exception e) {
+ throw new QueryProcessException(
+ String.format("Cannot load file %s because %s",
file.getAbsolutePath(), e.getMessage()));
+ }
+ }
+
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
private void createSchemaAutomatically(
List<ChunkGroupMetadata> chunkGroupMetadataList,
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index cbf76bd..7953882 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -43,6 +44,7 @@ public class SystemInfo {
private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+ private AtomicLong compactionNum = new AtomicLong(0);
private static final double FLUSH_THERSHOLD =
config.getAllocateMemoryForWrite() * config.getFlushProportion();
private static final double REJECT_THERSHOLD =
@@ -202,4 +204,13 @@ public class SystemInfo {
private static SystemInfo instance = new SystemInfo();
}
+
+ // 用于 vldb 投稿,改变 or 获取 compactionNum 的值
+ public long getCompactionNum() {
+ return compactionNum.get();
+ }
+
+ public void incrementCompactionNum(long batchSize) {
+ compactionNum.getAndAdd(batchSize);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSinglePathRead.java
b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSinglePathRead.java
new file mode 100644
index 0000000..528c46d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSinglePathRead.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.query.reader.chunk.ChunkDataIterator;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class TsFileSinglePathRead {
+
+ public static void main(String[] args) throws IOException {
+// Pair<String, String> params = checkArgs(args);
+// for (File file : new
File("/Users/tianyu/2019秋季学期/iotdb/server/target/iotdb-server-0.11.2/data/data/unsequence/root.group_9/unseq").listFiles())
{
+// if (file.getName().endsWith(".tsfile")) {
+// Pair<String, String> params = new Pair<>(
+// file.getAbsolutePath(), "root.group_9.d_69.s_299");
+// System.out.println(file.getName() + ":" +
printTsFileSinglePath(params));
+// }
+// }
+ Pair<String, String> params = new Pair<>(
+
"/Users/tianyu/2019秋季学期/iotdb/data/data/sequence/root.group_9/0/1613635606223-1-0.tsfile",
+ "root.group_9.d_19.s_810");
+ System.out.println("-" + printTsFileSinglePath(params));
+ }
+
+ public static int printTsFileSinglePath(Pair<String, String> params) throws
IOException{
+ String filename = params.left;
+ String path = params.right;
+ int numCnt = 0;
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+ if (!reader.isComplete()){
+ throw new RuntimeException("The analyzed data file is incomplete,
process will stop");
+ }
+ // get the chunkMetaList of the specific path
+ List<ChunkMetadata> chunkMetadataList = reader
+ .getChunkMetadataList(new Path(path, true), false);
+ for (ChunkMetadata metadata : chunkMetadataList) {
+ System.out.println("|--[Chunk]");
+ ChunkReader chunkReader = new
ChunkReader(reader.readMemChunk(metadata), null);
+ ChunkDataIterator chunkDataIterator = new
ChunkDataIterator(chunkReader);
+ while (chunkDataIterator.hasNextTimeValuePair()) {
+ TimeValuePair pair = chunkDataIterator.nextTimeValuePair();
+ System.out.println(
+ "\t\t\ttime, value: " + pair.getTimestamp() + ", " +
pair.getValue());
+ numCnt++;
+ }
+ }
+ }
+ return numCnt;
+ }
+
+ private static Pair<String, String> checkArgs(String[] args) {
+ String filename;
+ String path;
+ if (args.length >= 2) {
+ filename = args[0];
+ path = args[1];
+ } else {
+ throw new RuntimeException("Input args length less than two. Example
usage: "
+ + "tools/print-tsfile-specific-measurement.sh file_path
timeseries_path");
+ }
+ return new Pair<>(filename, path);
+ }
+}
\ No newline at end of file
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 6d79f9b..f7fed34 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -1053,6 +1053,14 @@ public class TsFileSequenceReader implements
AutoCloseable {
return chunkMetadataList;
}
+ public List<ChunkMetadata> getChunkMetadataList(Path path, boolean anyway)
throws IOException {
+ TimeseriesMetadata timeseriesMetaData = readTimeseriesMetadata(path);
+ if (timeseriesMetaData == null) {
+ return Collections.emptyList();
+ }
+ return readChunkMetaDataList(timeseriesMetaData);
+ }
+
/**
* get ChunkMetaDatas in given TimeseriesMetaData
*