This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3e86379d82 [IOTDB-4681] speed up mpp load (#7725)
3e86379d82 is described below
commit 3e86379d829c3967178b692e500c37ac12549b8f
Author: yschengzi <[email protected]>
AuthorDate: Tue Nov 8 08:57:11 2022 +0800
[IOTDB-4681] speed up mpp load (#7725)
---
.../iotdb/commons/partition/DataPartition.java | 8 -
.../iotdb/db/engine/load/AlignedChunkData.java | 322 ++++++----------
.../org/apache/iotdb/db/engine/load/ChunkData.java | 25 +-
.../apache/iotdb/db/engine/load/DeletionData.java | 4 +-
.../iotdb/db/engine/load/LoadTsFileManager.java | 17 +-
.../iotdb/db/engine/load/NonAlignedChunkData.java | 245 +++++-------
.../apache/iotdb/db/engine/load/TsFileData.java | 3 +-
.../load/TsFileSplitter.java} | 415 ++++++++-------------
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 17 +-
.../plan/node/load/LoadSingleTsFileNode.java | 415 ++-------------------
.../planner/plan/node/load/LoadTsFileNode.java | 19 +-
.../plan/node/load/LoadTsFilePieceNode.java | 13 +-
.../scheduler/load/LoadTsFileDispatcherImpl.java | 2 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 230 ++++++++----
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 24 ++
.../apache/iotdb/db/utils/TimePartitionUtils.java | 4 +
.../iotdb/tsfile/file/header/ChunkHeader.java | 21 ++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 22 ++
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 4 +
.../write/writer/TsFileIOWriterEndFileTest.java | 49 +++
20 files changed, 707 insertions(+), 1152 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 4164fc716b..00c2139556 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -97,14 +97,6 @@ public class DataPartition extends Partition {
.collect(Collectors.toList());
}
- public List<TRegionReplicaSet> getAllDataRegionReplicaSetForOneDevice(String
deviceName) {
- String storageGroup = getStorageGroupByDevice(deviceName);
- TSeriesPartitionSlot seriesPartitionSlot =
calculateDeviceGroupId(deviceName);
- return
dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream()
- .flatMap(entry -> entry.getValue().stream())
- .collect(Collectors.toList());
- }
-
public TRegionReplicaSet getDataRegionReplicaSetForWriting(
String deviceName, TTimePartitionSlot timePartitionSlot) {
// A list of data region replica sets will store data in a same time
partition.
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
b/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
index 103503d870..d81117e42b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
@@ -21,22 +21,16 @@ package org.apache.iotdb.db.engine.load;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.db.utils.TimePartitionUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
-import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
@@ -45,13 +39,14 @@ import
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
public class AlignedChunkData implements ChunkData {
private static final int DEFAULT_INT32 = 0;
@@ -61,33 +56,35 @@ public class AlignedChunkData implements ChunkData {
private static final boolean DEFAULT_BOOLEAN = false;
private static final Binary DEFAULT_BINARY = null;
- private List<Long> offset;
- private List<Long> dataSize;
- private boolean isHeadPageNeedDecode;
- private boolean isTailPageNeedDecode;
-
- private TTimePartitionSlot timePartitionSlot;
- private String device;
+ private final TTimePartitionSlot timePartitionSlot;
+ private final String device;
private List<ChunkHeader> chunkHeaderList;
- private List<IChunkMetadata> chunkMetadataList;
+ private final PublicBAOS byteStream;
+ private final DataOutputStream stream;
private List<long[]> timeBatch;
- private List<Integer> satisfiedTimeBatchLength;
+ private long dataSize;
+ private boolean needDecodeChunk;
+ private List<Integer> pageNumbers;
+ private Queue<Integer> satisfiedLengthQueue;
private AlignedChunkWriterImpl chunkWriter;
private List<Chunk> chunkList;
- public AlignedChunkData(long timeOffset, String device, ChunkHeader
chunkHeader) {
- this.offset = new ArrayList<>();
- this.dataSize = new ArrayList<>();
- this.isHeadPageNeedDecode = false;
- this.isTailPageNeedDecode = false;
+ public AlignedChunkData(
+ String device, ChunkHeader chunkHeader, TTimePartitionSlot
timePartitionSlot) {
+ this.dataSize = 0;
this.device = device;
this.chunkHeaderList = new ArrayList<>();
+ this.timePartitionSlot = timePartitionSlot;
+ this.needDecodeChunk = true;
+ this.pageNumbers = new ArrayList<>();
+ this.satisfiedLengthQueue = new LinkedList<>();
+ this.byteStream = new PublicBAOS();
+ this.stream = new DataOutputStream(byteStream);
- offset.add(timeOffset);
- dataSize.add(0L);
chunkHeaderList.add(chunkHeader);
+ pageNumbers.add(0);
}
@Override
@@ -102,38 +99,12 @@ public class AlignedChunkData implements ChunkData {
@Override
public long getDataSize() {
- return dataSize.stream().mapToLong(o -> o).sum();
- }
-
- @Override
- public void addDataSize(long pageSize) {
- dataSize.set(0, dataSize.get(0) + pageSize);
- }
-
- @Override
- public void setNotDecode(IChunkMetadata chunkMetadata) {
- chunkMetadataList = new ArrayList<>();
- chunkMetadataList.add(chunkMetadata);
+ return dataSize;
}
@Override
- public boolean needDecodeChunk() {
- return chunkMetadataList == null;
- }
-
- @Override
- public void setHeadPageNeedDecode(boolean headPageNeedDecode) {
- isHeadPageNeedDecode = headPageNeedDecode;
- }
-
- @Override
- public void setTailPageNeedDecode(boolean tailPageNeedDecode) {
- isTailPageNeedDecode = tailPageNeedDecode;
- }
-
- @Override
- public void setTimePartitionSlot(TTimePartitionSlot timePartitionSlot) {
- this.timePartitionSlot = timePartitionSlot;
+ public void setNotDecode() {
+ needDecodeChunk = false;
}
@Override
@@ -152,196 +123,119 @@ public class AlignedChunkData implements ChunkData {
}
}
- public void addValueChunk(long offset, ChunkHeader chunkHeader,
IChunkMetadata chunkMetadata) {
- this.offset.add(offset);
- this.dataSize.add(0L);
+ public void addValueChunk(ChunkHeader chunkHeader) {
this.chunkHeaderList.add(chunkHeader);
- if (chunkMetadataList != null) {
- chunkMetadataList.add(chunkMetadata);
- }
- }
-
- public void addValueChunkDataSize(long dataSize) {
- int lastIndex = this.dataSize.size() - 1;
- this.dataSize.set(lastIndex, this.dataSize.get(lastIndex) + dataSize);
+ this.pageNumbers.add(0);
}
@Override
- public void serialize(DataOutputStream stream, File tsFile) throws
IOException {
+ public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(isModification(), stream);
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
- serializeTsFileData(stream, tsFile);
+ byteStream.writeTo(stream);
+ close();
}
private void serializeAttr(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
ReadWriteIOUtils.write(device, stream);
+ ReadWriteIOUtils.write(needDecodeChunk, stream);
ReadWriteIOUtils.write(chunkHeaderList.size(), stream);
for (ChunkHeader chunkHeader : chunkHeaderList) {
chunkHeader.serializeTo(stream); // chunk header already serialize chunk
type
}
- }
-
- private void serializeTsFileData(DataOutputStream stream, File tsFile)
throws IOException {
- timeBatch = new ArrayList<>();
- satisfiedTimeBatchLength = new ArrayList<>();
- ReadWriteIOUtils.write(needDecodeChunk(), stream);
- try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- int chunkSize = offset.size();
- for (int i = 0; i < chunkSize; i++) {
- if (needDecodeChunk()) {
- serializeDecodeChunk(stream, reader, chunkHeaderList.get(i), i);
- } else {
- serializeEntireChunk(stream, reader, chunkHeaderList.get(i),
chunkMetadataList.get(i));
- }
+ if (needDecodeChunk) {
+ for (Integer pageNumber : pageNumbers) {
+ ReadWriteIOUtils.write(pageNumber, stream);
}
}
- timeBatch = null;
- satisfiedTimeBatchLength = null;
}
- private void serializeEntireChunk(
- DataOutputStream stream,
- TsFileSequenceReader reader,
- ChunkHeader chunkHeader,
- IChunkMetadata chunkMetadata)
+ @Override
+ public void writeEntireChunk(ByteBuffer chunkData, IChunkMetadata
chunkMetadata)
throws IOException {
- ByteBuffer chunkData =
- reader.readChunk(
- chunkMetadata.getOffsetOfChunkHeader() +
chunkHeader.getSerializedSize(),
- chunkHeader.getDataSize());
- ReadWriteIOUtils.write(chunkData, stream);
+ dataSize += ReadWriteIOUtils.write(chunkData, stream);
chunkMetadata.getStatistics().serialize(stream);
}
- private void serializeDecodeChunk(
- DataOutputStream stream, TsFileSequenceReader reader, ChunkHeader
chunkHeader, int chunkIndex)
- throws IOException {
- Decoder defaultTimeDecoder =
- Decoder.getDecoderByType(
-
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
- TSDataType.INT64);
- Decoder valueDecoder =
- Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType());
-
- reader.position(offset.get(chunkIndex));
- int decodePageIndex = 0; // should be 0,1 or 2
- long dataSize = this.dataSize.get(chunkIndex);
- while (dataSize > 0) {
- boolean hasStatistic = (chunkHeader.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER;
- PageHeader pageHeader = reader.readPageHeader(chunkHeader.getDataType(),
hasStatistic);
- long pageDataSize = pageHeader.getSerializedPageSize();
- if ((dataSize == this.dataSize.get(chunkIndex) && isHeadPageNeedDecode)
// decode head page
- || (dataSize == pageDataSize && isTailPageNeedDecode)) { // decode
tail page
- ReadWriteIOUtils.write(true, stream); // decode
- if (chunkIndex == 0) {
- decodeTimePage(reader, chunkHeader, pageHeader, defaultTimeDecoder,
valueDecoder, stream);
- } else {
- decodeValuePage(reader, chunkHeader, pageHeader, decodePageIndex,
valueDecoder, stream);
- }
- decodePageIndex += 1;
- } else { // entire page
- ReadWriteIOUtils.write(false, stream); // don't decode
- pageHeader.serializeTo(stream);
- ByteBuffer pageData = reader.readCompressedPage(pageHeader);
- ReadWriteIOUtils.write(pageData, stream);
- }
- dataSize -= pageDataSize;
- }
-
- ReadWriteIOUtils.write(true, stream); // means ending
- ReadWriteIOUtils.write(-1, stream);
+ @Override
+ public void writeEntirePage(PageHeader pageHeader, ByteBuffer pageData)
throws IOException {
+ pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size()
- 1) + 1);
+ dataSize += ReadWriteIOUtils.write(false, stream);
+ pageHeader.serializeTo(stream);
+ dataSize += ReadWriteIOUtils.write(pageData, stream);
}
- private void decodeTimePage(
- TsFileSequenceReader reader,
- ChunkHeader chunkHeader,
- PageHeader pageHeader,
- Decoder timeDecoder,
- Decoder valueDecoder,
- DataOutputStream stream)
+ @Override
+ public void writeDecodePage(long[] times, Object[] values, int
satisfiedLength)
throws IOException {
- valueDecoder.reset();
- ByteBuffer pageData = reader.readPage(pageHeader,
chunkHeader.getCompressionType());
- TimePageReader timePageReader = new TimePageReader(pageHeader, pageData,
timeDecoder);
- long[] decodeTime = timePageReader.getNextTimeBatch();
- int satisfiedLength = 0;
- long[] time = new long[decodeTime.length];
- for (int i = 0; i < decodeTime.length; i++) {
- if (decodeTime[i] < timePartitionSlot.getStartTime()) {
+ pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size()
- 1) + 1);
+ satisfiedLengthQueue.offer(satisfiedLength);
+ long startTime = timePartitionSlot.getStartTime();
+ long endTime = startTime +
TimePartitionUtils.getTimePartitionIntervalForRouting();
+ dataSize += ReadWriteIOUtils.write(true, stream);
+ dataSize += ReadWriteIOUtils.write(satisfiedLength, stream);
+
+ for (int i = 0; i < times.length; i++) {
+ if (times[i] < startTime) {
continue;
- } else if (!timePartitionSlot.equals(
- TimePartitionUtils.getTimePartitionForRouting(decodeTime[i]))) {
+ } else if (times[i] >= endTime) {
break;
}
- time[satisfiedLength++] = decodeTime[i];
+ dataSize += ReadWriteIOUtils.write(times[i], stream);
}
- ReadWriteIOUtils.write(satisfiedLength, stream);
- for (int i = 0; i < satisfiedLength; i++) {
- ReadWriteIOUtils.write(time[i], stream);
- }
- timeBatch.add(decodeTime);
- satisfiedTimeBatchLength.add(satisfiedLength);
}
- private void decodeValuePage(
- TsFileSequenceReader reader,
- ChunkHeader chunkHeader,
- PageHeader pageHeader,
- int pageIndex,
- Decoder valueDecoder,
- DataOutputStream stream)
+ public void writeDecodeValuePage(long[] times, TsPrimitiveType[] values,
TSDataType dataType)
throws IOException {
- valueDecoder.reset();
- ByteBuffer pageData = reader.readPage(pageHeader,
chunkHeader.getCompressionType());
- ValuePageReader valuePageReader =
- new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(),
valueDecoder);
- long[] time = timeBatch.get(pageIndex);
- TsPrimitiveType[] valueBatch =
- valuePageReader.nextValueBatch(
- time); // should be origin time, so recording satisfied length is
necessary
- ReadWriteIOUtils.write(satisfiedTimeBatchLength.get(pageIndex), stream);
- for (int i = 0; i < valueBatch.length; i++) {
- if (time[i] < timePartitionSlot.getStartTime()) {
+ pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size()
- 1) + 1);
+ long startTime = timePartitionSlot.getStartTime();
+ long endTime = startTime +
TimePartitionUtils.getTimePartitionIntervalForRouting();
+ int satisfiedLength = satisfiedLengthQueue.poll();
+ dataSize += ReadWriteIOUtils.write(true, stream);
+ dataSize += ReadWriteIOUtils.write(satisfiedLength, stream);
+ satisfiedLengthQueue.offer(satisfiedLength);
+
+ for (int i = 0; i < times.length; i++) {
+ if (times[i] < startTime) {
continue;
- } else if (!timePartitionSlot.equals(
- TimePartitionUtils.getTimePartitionForRouting(time[i]))) {
+ } else if (times[i] >= endTime) {
break;
}
- if (valueBatch[i] == null) {
- ReadWriteIOUtils.write(true, stream);
+
+ if (values[i] == null) {
+ dataSize += ReadWriteIOUtils.write(true, stream);
continue;
}
- ReadWriteIOUtils.write(false, stream);
- switch (chunkHeader.getDataType()) {
+ dataSize += ReadWriteIOUtils.write(false, stream);
+ switch (dataType) {
case INT32:
- ReadWriteIOUtils.write(valueBatch[i].getInt(), stream);
+ dataSize += ReadWriteIOUtils.write(values[i].getInt(), stream);
break;
case INT64:
- ReadWriteIOUtils.write(valueBatch[i].getLong(), stream);
+ dataSize += ReadWriteIOUtils.write(values[i].getLong(), stream);
break;
case FLOAT:
- ReadWriteIOUtils.write(valueBatch[i].getFloat(), stream);
+ dataSize += ReadWriteIOUtils.write(values[i].getFloat(), stream);
break;
case DOUBLE:
- ReadWriteIOUtils.write(valueBatch[i].getDouble(), stream);
+ dataSize += ReadWriteIOUtils.write(values[i].getDouble(), stream);
break;
case BOOLEAN:
- ReadWriteIOUtils.write(valueBatch[i].getBoolean(), stream);
+ dataSize += ReadWriteIOUtils.write(values[i].getBoolean(), stream);
break;
case TEXT:
- ReadWriteIOUtils.write(valueBatch[i].getBinary(), stream);
+ dataSize += ReadWriteIOUtils.write(values[i].getBinary(), stream);
break;
default:
throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.",
chunkHeader.getDataType()));
+ String.format("Data type %s is not supported.", dataType));
}
}
}
private void deserializeTsFileData(InputStream stream) throws IOException,
PageException {
- boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
if (needDecodeChunk) {
buildChunkWriter(stream);
} else {
@@ -378,24 +272,24 @@ public class AlignedChunkData implements ChunkData {
timeBatch = new ArrayList<>();
int chunkHeaderSize = chunkHeaderList.size();
for (int i = 0; i < chunkHeaderSize; i++) {
- buildChunk(stream, chunkHeaderList.get(i), i - 1, i == 0);
+ buildChunk(stream, chunkHeaderList.get(i), pageNumbers.get(i), i - 1, i
== 0);
}
timeBatch = null;
}
private void buildChunk(
- InputStream stream, ChunkHeader chunkHeader, int valueChunkIndex,
boolean isTimeChunk)
+ InputStream stream,
+ ChunkHeader chunkHeader,
+ int pageNumber,
+ int valueChunkIndex,
+ boolean isTimeChunk)
throws IOException, PageException {
boolean needDecode;
int decodePageIndex = 0;
- while (true) {
+ for (int j = 0; j < pageNumber; j++) {
needDecode = ReadWriteIOUtils.readBool(stream);
if (needDecode) {
int length = ReadWriteIOUtils.readInt(stream);
- if (length == -1) {
- break;
- }
-
long[] timePageBatch = new long[length];
if (!isTimeChunk) {
timePageBatch = timeBatch.get(decodePageIndex);
@@ -465,42 +359,52 @@ public class AlignedChunkData implements ChunkData {
}
public static AlignedChunkData deserialize(InputStream stream) throws
IOException, PageException {
- long timePartition = ReadWriteIOUtils.readLong(stream);
+ TTimePartitionSlot timePartitionSlot =
+
TimePartitionUtils.getTimePartitionForRouting(ReadWriteIOUtils.readLong(stream));
String device = ReadWriteIOUtils.readString(stream);
+ boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
int chunkHeaderListSize = ReadWriteIOUtils.readInt(stream);
- ChunkHeader[] chunkHeaderList = new ChunkHeader[chunkHeaderListSize];
+ List<ChunkHeader> chunkHeaderList = new ArrayList<>();
for (int i = 0; i < chunkHeaderListSize; i++) {
byte chunkType = ReadWriteIOUtils.readByte(stream);
- chunkHeaderList[i] = ChunkHeader.deserializeFrom(stream, chunkType);
+ chunkHeaderList.add(ChunkHeader.deserializeFrom(stream, chunkType));
}
-
- AlignedChunkData chunkData = new AlignedChunkData(-1, device,
chunkHeaderList[0]);
- for (int i = 1; i < chunkHeaderListSize; i++) {
- chunkData.addValueChunk(-1, chunkHeaderList[i], null);
+ List<Integer> pageNumbers = new ArrayList<>();
+ if (needDecodeChunk) {
+ for (int i = 0; i < chunkHeaderListSize; i++) {
+ pageNumbers.add(ReadWriteIOUtils.readInt(stream));
+ }
}
-
chunkData.setTimePartitionSlot(TimePartitionUtils.getTimePartitionForRouting(timePartition));
+
+ AlignedChunkData chunkData =
+ new AlignedChunkData(device, chunkHeaderList.get(0),
timePartitionSlot);
+ chunkData.needDecodeChunk = needDecodeChunk;
+ chunkData.chunkHeaderList = chunkHeaderList;
+ chunkData.pageNumbers = pageNumbers;
chunkData.deserializeTsFileData(stream);
+ chunkData.close();
return chunkData;
}
+ private void close() throws IOException {
+ byteStream.close();
+ stream.close();
+ }
+
@Override
public String toString() {
return "AlignedChunkData{"
- + "offset="
- + offset
- + ", dataSize="
- + dataSize
- + ", isHeadPageNeedDecode="
- + isHeadPageNeedDecode
- + ", isTailPageNeedDecode="
- + isTailPageNeedDecode
- + ", timePartitionSlot="
+ + "timePartitionSlot="
+ timePartitionSlot
+ ", device='"
+ device
+ '\''
+ ", chunkHeaderList="
+ chunkHeaderList
+ + ", totalDataSize="
+ + dataSize
+ + ", needDecodeChunk="
+ + needDecodeChunk
+ '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
b/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
index bf5dae6256..0b19a43b9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
@@ -22,30 +22,28 @@ package org.apache.iotdb.db.engine.load;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
public interface ChunkData extends TsFileData {
String getDevice();
TTimePartitionSlot getTimePartitionSlot();
- void addDataSize(long pageSize);
+ void setNotDecode();
- void setNotDecode(IChunkMetadata chunkMetadata);
-
- boolean needDecodeChunk();
-
- void setHeadPageNeedDecode(boolean headPageNeedDecode);
+ boolean isAligned();
- void setTailPageNeedDecode(boolean tailPageNeedDecode);
+ void writeEntireChunk(ByteBuffer chunkData, IChunkMetadata chunkMetadata)
throws IOException;
- void setTimePartitionSlot(TTimePartitionSlot timePartitionSlot);
+ void writeEntirePage(PageHeader pageHeader, ByteBuffer pageData) throws
IOException;
- boolean isAligned();
+ void writeDecodePage(long[] times, Object[] values, int satisfiedLength)
throws IOException;
@Override
default boolean isModification() {
@@ -60,9 +58,12 @@ public interface ChunkData extends TsFileData {
}
static ChunkData createChunkData(
- boolean isAligned, long offset, String device, ChunkHeader chunkHeader) {
+ boolean isAligned,
+ String device,
+ ChunkHeader chunkHeader,
+ TTimePartitionSlot timePartitionSlot) {
return isAligned
- ? new AlignedChunkData(offset, device, chunkHeader)
- : new NonAlignedChunkData(offset, device, chunkHeader);
+ ? new AlignedChunkData(device, chunkHeader, timePartitionSlot)
+ : new NonAlignedChunkData(device, chunkHeader, timePartitionSlot);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
b/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
index 2b9cfe8e76..8fcfa552d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
@@ -40,7 +40,7 @@ public class DeletionData implements TsFileData {
@Override
public long getDataSize() {
- return 0;
+ return Long.BYTES;
}
@Override
@@ -60,7 +60,7 @@ public class DeletionData implements TsFileData {
}
@Override
- public void serialize(DataOutputStream stream, File tsFile) throws
IOException {
+ public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(isModification(), stream);
deletion.serializeWithoutFileOffset(stream);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
index 57119901cb..fa9f7abd9b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
@@ -28,14 +28,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
import
org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.PageException;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -44,7 +43,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -235,18 +233,7 @@ public class LoadTsFileManager {
}
private TsFileResource generateResource(TsFileIOWriter writer) throws
IOException {
- TsFileResource tsFileResource = new TsFileResource(writer.getFile());
- Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
- writer.getDeviceTimeseriesMetadataMap();
- for (Map.Entry<String, List<TimeseriesMetadata>> entry :
- deviceTimeseriesMetadataMap.entrySet()) {
- String device = entry.getKey();
- for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
- tsFileResource.updateStartTime(device,
timeseriesMetaData.getStatistics().getStartTime());
- tsFileResource.updateEndTime(device,
timeseriesMetaData.getStatistics().getEndTime());
- }
- }
- tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+ TsFileResource tsFileResource =
FileLoaderUtils.generateTsFileResource(writer);
tsFileResource.serialize();
return tsFileResource;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
b/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
index c3d378a2b5..3091988f9b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
@@ -21,55 +21,51 @@ package org.apache.iotdb.db.engine.load;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.db.utils.TimePartitionUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
public class NonAlignedChunkData implements ChunkData {
- private long offset;
- private long dataSize;
- private boolean isHeadPageNeedDecode;
- private boolean isTailPageNeedDecode;
- private TTimePartitionSlot timePartitionSlot;
- private String device;
- private ChunkHeader chunkHeader;
- private IChunkMetadata chunkMetadata;
+ private final TTimePartitionSlot timePartitionSlot;
+ private final String device;
+ private final ChunkHeader chunkHeader;
+
+ private final PublicBAOS byteStream;
+ private final DataOutputStream stream;
+ private long dataSize;
+ private boolean needDecodeChunk;
+ private int pageNumber;
private ChunkWriterImpl chunkWriter;
private Chunk chunk;
- public NonAlignedChunkData(long offset, String device, ChunkHeader
chunkHeader) {
- this.offset = offset;
+ public NonAlignedChunkData(
+ String device, ChunkHeader chunkHeader, TTimePartitionSlot
timePartitionSlot) {
this.dataSize = 0;
- this.isHeadPageNeedDecode = false;
- this.isTailPageNeedDecode = false;
this.device = device;
this.chunkHeader = chunkHeader;
+ this.timePartitionSlot = timePartitionSlot;
+ this.needDecodeChunk = true;
+ this.pageNumber = 0;
+ this.byteStream = new PublicBAOS();
+ this.stream = new DataOutputStream(byteStream);
}
@Override
@@ -88,33 +84,8 @@ public class NonAlignedChunkData implements ChunkData {
}
@Override
- public void addDataSize(long pageSize) {
- dataSize += pageSize;
- }
-
- @Override
- public void setNotDecode(IChunkMetadata chunkMetadata) {
- this.chunkMetadata = chunkMetadata;
- }
-
- @Override
- public boolean needDecodeChunk() {
- return chunkMetadata == null;
- }
-
- @Override
- public void setHeadPageNeedDecode(boolean headPageNeedDecode) {
- isHeadPageNeedDecode = headPageNeedDecode;
- }
-
- @Override
- public void setTailPageNeedDecode(boolean tailPageNeedDecode) {
- isTailPageNeedDecode = tailPageNeedDecode;
- }
-
- @Override
- public void setTimePartitionSlot(TTimePartitionSlot timePartitionSlot) {
- this.timePartitionSlot = timePartitionSlot;
+ public void setNotDecode() {
+ needDecodeChunk = false;
}
@Override
@@ -132,137 +103,83 @@ public class NonAlignedChunkData implements ChunkData {
}
@Override
- public void serialize(DataOutputStream stream, File tsFile) throws
IOException {
+ public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(isModification(), stream);
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
- if (needDecodeChunk()) {
- ReadWriteIOUtils.write(true, stream);
- serializeDecodeChunk(stream, tsFile);
- } else {
- ReadWriteIOUtils.write(false, stream);
- serializeEntireChunk(stream, tsFile);
- }
+ byteStream.writeTo(stream);
+ close();
}
private void serializeAttr(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
ReadWriteIOUtils.write(device, stream);
+ ReadWriteIOUtils.write(needDecodeChunk, stream);
chunkHeader.serializeTo(stream); // chunk header already serialize chunk
type
- }
-
- private void serializeEntireChunk(DataOutputStream stream, File tsFile)
throws IOException {
- try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- ByteBuffer chunkData =
- reader.readChunk(
- chunkMetadata.getOffsetOfChunkHeader() +
chunkHeader.getSerializedSize(),
- chunkHeader.getDataSize());
- ReadWriteIOUtils.write(chunkData, stream);
- chunkMetadata.getStatistics().serialize(stream);
+ if (needDecodeChunk) {
+ ReadWriteIOUtils.write(pageNumber, stream);
}
}
- private void serializeDecodeChunk(DataOutputStream stream, File tsFile)
throws IOException {
- try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- Decoder defaultTimeDecoder =
- Decoder.getDecoderByType(
-
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
- TSDataType.INT64);
- Decoder valueDecoder =
- Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType());
-
- reader.position(offset);
- long dataSize = this.dataSize;
- while (dataSize > 0) {
- boolean hasStatistic = (chunkHeader.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER;
- PageHeader pageHeader =
reader.readPageHeader(chunkHeader.getDataType(), hasStatistic);
- long pageDataSize = pageHeader.getSerializedPageSize();
- if ((dataSize == this.dataSize && isHeadPageNeedDecode) // decode head
page
- || (dataSize == pageDataSize && isTailPageNeedDecode)) { // decode
tail page
- ReadWriteIOUtils.write(true, stream); // decode
- decodePage(reader, pageHeader, defaultTimeDecoder, valueDecoder,
stream);
- } else { // entire page
- ReadWriteIOUtils.write(false, stream); // don't decode
- pageHeader.serializeTo(stream);
- ByteBuffer pageData = reader.readCompressedPage(pageHeader);
- ReadWriteIOUtils.write(pageData, stream);
- }
- dataSize -= pageDataSize;
- }
- }
+ @Override
+ public void writeEntireChunk(ByteBuffer chunkData, IChunkMetadata
chunkMetadata)
+ throws IOException {
+ dataSize += ReadWriteIOUtils.write(chunkData, stream);
+ dataSize += chunkMetadata.getStatistics().serialize(stream);
+ }
- ReadWriteIOUtils.write(true, stream); // means ending
- ReadWriteIOUtils.write(-1, stream);
+ @Override
+ public void writeEntirePage(PageHeader pageHeader, ByteBuffer pageData)
throws IOException {
+ pageNumber += 1;
+ dataSize += ReadWriteIOUtils.write(false, stream);
+ pageHeader.serializeTo(stream);
+ dataSize += ReadWriteIOUtils.write(pageData, stream);
}
- private void decodePage(
- TsFileSequenceReader reader,
- PageHeader pageHeader,
- Decoder timeDecoder,
- Decoder valueDecoder,
- DataOutputStream stream)
+ @Override
+ public void writeDecodePage(long[] times, Object[] values, int
satisfiedLength)
throws IOException {
- valueDecoder.reset();
- ByteBuffer pageData = reader.readPage(pageHeader,
chunkHeader.getCompressionType());
- PageReader pageReader =
- new PageReader(pageData, chunkHeader.getDataType(), valueDecoder,
timeDecoder, null);
- BatchData batchData = pageReader.getAllSatisfiedPageData();
-
- int length = 0;
- while (batchData.hasCurrent()) {
- long time = batchData.currentTime();
- if (time < timePartitionSlot.getStartTime()) {
- batchData.next();
+ pageNumber += 1;
+ long startTime = timePartitionSlot.getStartTime();
+ long endTime = startTime +
TimePartitionUtils.getTimePartitionIntervalForRouting();
+ dataSize += ReadWriteIOUtils.write(true, stream);
+ dataSize += ReadWriteIOUtils.write(satisfiedLength, stream);
+
+ for (int i = 0; i < times.length; i++) {
+ if (times[i] < startTime) {
continue;
- } else if
(!timePartitionSlot.equals(TimePartitionUtils.getTimePartitionForRouting(time)))
{
+ } else if (times[i] >= endTime) {
break;
}
- length += 1;
- batchData.next();
- }
- ReadWriteIOUtils.write(length, stream);
- batchData.resetBatchData();
- while (batchData.hasCurrent()) {
- long time = batchData.currentTime();
- if (time < timePartitionSlot.getStartTime()) {
- batchData.next();
- continue;
- } else if
(!timePartitionSlot.equals(TimePartitionUtils.getTimePartitionForRouting(time)))
{
- break;
- }
-
- ReadWriteIOUtils.write(time, stream);
- Object value = batchData.currentValue();
+ dataSize += ReadWriteIOUtils.write(times[i], stream);
switch (chunkHeader.getDataType()) {
case INT32:
- ReadWriteIOUtils.write((int) value, stream);
+ dataSize += ReadWriteIOUtils.write((int) values[i], stream);
break;
case INT64:
- ReadWriteIOUtils.write((long) value, stream);
+ dataSize += ReadWriteIOUtils.write((long) values[i], stream);
break;
case FLOAT:
- ReadWriteIOUtils.write((float) value, stream);
+ dataSize += ReadWriteIOUtils.write((float) values[i], stream);
break;
case DOUBLE:
- ReadWriteIOUtils.write((double) value, stream);
+ dataSize += ReadWriteIOUtils.write((double) values[i], stream);
break;
case BOOLEAN:
- ReadWriteIOUtils.write((boolean) value, stream);
+ dataSize += ReadWriteIOUtils.write((boolean) values[i], stream);
break;
case TEXT:
- ReadWriteIOUtils.write((Binary) value, stream);
+ dataSize += ReadWriteIOUtils.write((Binary) values[i], stream);
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.",
chunkHeader.getDataType()));
}
- batchData.next();
}
}
private void deserializeTsFileData(InputStream stream) throws IOException,
PageException {
- boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
if (needDecodeChunk) {
buildChunkWriter(stream);
} else {
@@ -270,6 +187,14 @@ public class NonAlignedChunkData implements ChunkData {
}
}
+ private void deserializeEntireChunk(InputStream stream) throws IOException {
+ ByteBuffer chunkData =
+
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
+ Statistics<? extends Serializable> statistics =
+ Statistics.deserialize(stream, chunkHeader.getDataType());
+ chunk = new Chunk(chunkHeader, chunkData, null, statistics);
+ }
+
private void buildChunkWriter(InputStream stream) throws IOException,
PageException {
chunkWriter =
new ChunkWriterImpl(
@@ -279,14 +204,10 @@ public class NonAlignedChunkData implements ChunkData {
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType()));
boolean needDecode;
- while (true) {
+ for (int j = 0; j < pageNumber; j++) {
needDecode = ReadWriteIOUtils.readBool(stream);
if (needDecode) {
int length = ReadWriteIOUtils.readInt(stream);
- if (length == -1) {
- break;
- }
-
for (int i = 0; i < length; i++) {
long time = ReadWriteIOUtils.readLong(stream);
switch (chunkHeader.getDataType()) {
@@ -324,37 +245,37 @@ public class NonAlignedChunkData implements ChunkData {
}
}
- private void deserializeEntireChunk(InputStream stream) throws IOException {
- ByteBuffer chunkData =
-
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
- Statistics<? extends Serializable> statistics =
- Statistics.deserialize(stream, chunkHeader.getDataType());
- chunk = new Chunk(chunkHeader, chunkData, null, statistics);
- }
-
public static NonAlignedChunkData deserialize(InputStream stream)
throws IOException, PageException {
- long timePartition = ReadWriteIOUtils.readLong(stream);
+ TTimePartitionSlot timePartitionSlot =
+
TimePartitionUtils.getTimePartitionForRouting(ReadWriteIOUtils.readLong(stream));
String device = ReadWriteIOUtils.readString(stream);
+ boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
byte chunkType = ReadWriteIOUtils.readByte(stream);
ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, chunkType);
- NonAlignedChunkData chunkData = new NonAlignedChunkData(-1, device,
chunkHeader);
-
chunkData.setTimePartitionSlot(TimePartitionUtils.getTimePartitionForRouting(timePartition));
+ int pageNumber = 0;
+ if (needDecodeChunk) {
+ pageNumber = ReadWriteIOUtils.readInt(stream);
+ }
+
+ NonAlignedChunkData chunkData = new NonAlignedChunkData(device,
chunkHeader, timePartitionSlot);
+ chunkData.needDecodeChunk = needDecodeChunk;
+ chunkData.pageNumber = pageNumber;
chunkData.deserializeTsFileData(stream);
+ chunkData.close();
return chunkData;
}
+ private void close() throws IOException {
+ byteStream.close();
+ stream.close();
+ }
+
@Override
public String toString() {
return "NonAlignedChunkData{"
- + "offset="
- + offset
- + ", dataSize="
+ + "dataSize="
+ dataSize
- + ", isHeadPageNeedDecode="
- + isHeadPageNeedDecode
- + ", isTailPageNeedDecode="
- + isTailPageNeedDecode
+ ", timePartitionSlot="
+ timePartitionSlot
+ ", device='"
@@ -362,6 +283,8 @@ public class NonAlignedChunkData implements ChunkData {
+ '\''
+ ", chunkHeader="
+ chunkHeader
+ + ", needDecodeChunk="
+ + needDecodeChunk
+ '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
index 7f99064ff2..0fd9c5751f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -36,7 +35,7 @@ public interface TsFileData {
boolean isModification();
- void serialize(DataOutputStream stream, File tsFile) throws IOException;
+ void serialize(DataOutputStream stream) throws IOException;
static TsFileData deserialize(InputStream stream)
throws IOException, PageException, IllegalPathException {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileSplitter.java
similarity index 53%
copy from
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
copy to server/src/main/java/org/apache/iotdb/db/engine/load/TsFileSplitter.java
index 03bd5f3bae..edaa692eac 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileSplitter.java
@@ -17,32 +17,17 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.load;
+package org.apache.iotdb.db.engine.load;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartition;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.load.AlignedChunkData;
-import org.apache.iotdb.db.engine.load.ChunkData;
-import org.apache.iotdb.db.engine.load.DeletionData;
-import org.apache.iotdb.db.engine.load.TsFileData;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
@@ -56,15 +41,16 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -72,147 +58,20 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.stream.Collectors;
+import java.util.function.Function;
-public class LoadSingleTsFileNode extends WritePlanNode {
- private static final Logger logger =
LoggerFactory.getLogger(LoadSingleTsFileNode.class);
+public class TsFileSplitter {
+ private static final Logger logger =
LoggerFactory.getLogger(TsFileSplitter.class);
- private File tsFile;
- private boolean needDecodeTsFile;
+ private final File tsFile;
+ private final Function<TsFileData, Boolean> consumer;
- private Map<TRegionReplicaSet, List<LoadTsFilePieceNode>> replicaSet2Pieces;
-
- private TsFileResource resource;
- private TRegionReplicaSet localRegionReplicaSet;
-
- private boolean deleteAfterLoad;
-
- public LoadSingleTsFileNode(PlanNodeId id) {
- super(id);
- }
-
- public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource, boolean
deleteAfterLoad) {
- super(id);
- this.tsFile = resource.getTsFile();
- this.resource = resource;
- this.deleteAfterLoad = deleteAfterLoad;
- }
-
- public void checkIfNeedDecodeTsFile(DataPartition dataPartition) throws
IOException {
- Set<TRegionReplicaSet> allRegionReplicaSet = new HashSet<>();
- needDecodeTsFile = false;
- for (String device : resource.getDevices()) {
- if
(!TimePartitionUtils.getTimePartitionForRouting(resource.getStartTime(device))
-
.equals(TimePartitionUtils.getTimePartitionForRouting(resource.getEndTime(device))))
{
- needDecodeTsFile = true;
- return;
- }
-
allRegionReplicaSet.addAll(dataPartition.getAllDataRegionReplicaSetForOneDevice(device));
- }
- needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet);
- if (!needDecodeTsFile && !resource.resourceFileExists()) {
- resource.serialize();
- }
- }
-
- private boolean isDispatchedToLocal(Set<TRegionReplicaSet> replicaSets) {
- if (replicaSets.size() > 1) {
- return false;
- }
- for (TRegionReplicaSet replicaSet : replicaSets) {
- List<TDataNodeLocation> dataNodeLocationList =
replicaSet.getDataNodeLocations();
- if (dataNodeLocationList.size() > 1) {
- return false;
- }
- localRegionReplicaSet = replicaSet;
- return
isDispatchedToLocal(dataNodeLocationList.get(0).getInternalEndPoint());
- }
- return true;
- }
-
- private boolean isDispatchedToLocal(TEndPoint endPoint) {
- return
IoTDBDescriptor.getInstance().getConfig().getInternalAddress().equals(endPoint.getIp())
- && IoTDBDescriptor.getInstance().getConfig().getInternalPort() ==
endPoint.port;
- }
-
- public boolean needDecodeTsFile() {
- return needDecodeTsFile;
- }
-
- public boolean isDeleteAfterLoad() {
- return deleteAfterLoad;
- }
-
- /**
- * only used for load locally.
- *
- * @return local TRegionReplicaSet
- */
- public TRegionReplicaSet getLocalRegionReplicaSet() {
- return localRegionReplicaSet;
- }
-
- public TsFileResource getTsFileResource() {
- return resource;
- }
-
- public Map<TRegionReplicaSet, List<LoadTsFilePieceNode>>
getReplicaSet2Pieces() {
- return replicaSet2Pieces;
- }
-
- @Override
- public TRegionReplicaSet getRegionReplicaSet() {
- return null;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return null;
- }
-
- @Override
- public void addChild(PlanNode child) {}
-
- @Override
- public PlanNode clone() {
- throw new NotImplementedException("clone of load single TsFile is not
implemented");
- }
-
- @Override
- public int allowedChildCount() {
- return NO_CHILD_ALLOWED;
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
- @Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {}
-
- @Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {}
-
- @Override
- public List<WritePlanNode> splitByPartition(Analysis analysis) {
- throw new NotImplementedException("split load single TsFile is not
implemented");
+ public TsFileSplitter(File tsFile, Function<TsFileData, Boolean> consumer) {
+ this.tsFile = tsFile;
+ this.consumer = consumer;
}
- @Override
- public String toString() {
- return "LoadSingleTsFileNode{"
- + "tsFile="
- + tsFile
- + ", needDecodeTsFile="
- + needDecodeTsFile
- + '}';
- }
-
- public void splitTsFileByDataPartition(DataPartition dataPartition) throws
IOException {
- replicaSet2Pieces = new HashMap<>();
- List<TsFileData> tsFileDataList = new ArrayList<>();
-
+ public void splitTsFileByDataPartition() throws IOException,
IllegalStateException {
try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
TreeMap<Long, List<Deletion>> offset2Deletions = new TreeMap<>();
getAllModification(offset2Deletions);
@@ -225,7 +84,8 @@ public class LoadSingleTsFileNode extends WritePlanNode {
reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
String curDevice = null;
boolean isTimeChunkNeedDecode = true;
- Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = null;
+ Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = new
HashMap<>();
+ Map<Integer, long[]> pageIndex2Times = null;
Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
getChunkMetadata(reader, offset2ChunkMetadata);
byte marker;
@@ -236,12 +96,16 @@ public class LoadSingleTsFileNode extends WritePlanNode {
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
long chunkOffset = reader.position();
- handleModification(offset2Deletions, tsFileDataList, chunkOffset);
+ consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
+ handleModification(offset2Deletions, chunkOffset);
ChunkHeader header = reader.readChunkHeader(marker);
+ String measurementId = header.getMeasurementID();
if (header.getDataSize() == 0) {
throw new TsFileRuntimeException(
- String.format("Chunk data error when parsing TsFile %s.",
tsFile.getPath()));
+ String.format(
+ "Empty Nonaligned Chunk or Time Chunk with offset %d in
TsFile %s.",
+ chunkOffset, tsFile.getPath()));
}
boolean isAligned =
@@ -251,26 +115,21 @@ public class LoadSingleTsFileNode extends WritePlanNode {
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime());
ChunkData chunkData =
- ChunkData.createChunkData(isAligned, reader.position(),
curDevice, header);
- chunkData.setTimePartitionSlot(timePartitionSlot);
+ ChunkData.createChunkData(isAligned, curDevice, header,
timePartitionSlot);
+
if (!needDecodeChunk(chunkMetadata)) {
+ chunkData.setNotDecode();
+ chunkData.writeEntireChunk(reader.readChunk(-1,
header.getDataSize()), chunkMetadata);
if (isAligned) {
isTimeChunkNeedDecode = false;
- pageIndex2ChunkData = new HashMap<>();
pageIndex2ChunkData
.computeIfAbsent(1, o -> new ArrayList<>())
.add((AlignedChunkData) chunkData);
+ } else {
+ consumeChunkData(measurementId, chunkOffset, chunkData);
}
- chunkData.setNotDecode(chunkMetadata);
- chunkData.addDataSize(header.getDataSize());
- tsFileDataList.add(chunkData);
- reader.position(reader.position() + header.getDataSize());
break;
}
- if (isAligned) {
- isTimeChunkNeedDecode = true;
- pageIndex2ChunkData = new HashMap<>();
- }
Decoder defaultTimeDecoder =
Decoder.getDecoderByType(
@@ -280,8 +139,12 @@ public class LoadSingleTsFileNode extends WritePlanNode {
Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
int dataSize = header.getDataSize();
int pageIndex = 0;
+ if (isAligned) {
+ isTimeChunkNeedDecode = true;
+ pageIndex2Times = new HashMap<>();
+ }
+
while (dataSize > 0) {
- long pageOffset = reader.position();
PageHeader pageHeader =
reader.readPageHeader(
header.getDataType(),
@@ -295,50 +158,56 @@ public class LoadSingleTsFileNode extends WritePlanNode {
TTimePartitionSlot pageTimePartitionSlot =
TimePartitionUtils.getTimePartitionForRouting(startTime);
if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
- tsFileDataList.add(chunkData);
+ if (!isAligned) {
+ consumeChunkData(measurementId, chunkOffset, chunkData);
+ }
timePartitionSlot = pageTimePartitionSlot;
- chunkData = ChunkData.createChunkData(isAligned, pageOffset,
curDevice, header);
- chunkData.setTimePartitionSlot(timePartitionSlot);
+ chunkData =
+ ChunkData.createChunkData(isAligned, curDevice, header,
timePartitionSlot);
}
if (isAligned) {
pageIndex2ChunkData
.computeIfAbsent(pageIndex, o -> new ArrayList<>())
.add((AlignedChunkData) chunkData);
}
- chunkData.addDataSize(pageDataSize);
- reader.position(pageOffset + pageDataSize);
+ chunkData.writeEntirePage(pageHeader,
reader.readCompressedPage(pageHeader));
} else { // split page
ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
- long[] timeBatch =
+ Pair<long[], Object[]> tvArray =
decodePage(
isAligned, pageData, pageHeader, defaultTimeDecoder,
valueDecoder, header);
- boolean isFirstData = true;
- for (long currentTime : timeBatch) {
- TTimePartitionSlot currentTimePartitionSlot =
- TimePartitionUtils.getTimePartitionForRouting(
- currentTime); // TODO: can speed up
- if (!timePartitionSlot.equals(currentTimePartitionSlot)) {
- if (!isFirstData) {
- chunkData.setTailPageNeedDecode(true); // close last
chunk data
- chunkData.addDataSize(pageDataSize);
- if (isAligned) {
- pageIndex2ChunkData
- .computeIfAbsent(pageIndex, o -> new ArrayList<>())
- .add((AlignedChunkData) chunkData);
- }
+ long[] times = tvArray.left;
+ Object[] values = tvArray.right;
+ if (isAligned) {
+ pageIndex2Times.put(pageIndex, times);
+ }
+
+ int satisfiedLength = 0;
+ long endTime =
+ timePartitionSlot.getStartTime()
+ +
TimePartitionUtils.getTimePartitionIntervalForRouting();
+ for (int i = 0; i < times.length; i++) {
+ if (times[i] >= endTime) {
+ chunkData.writeDecodePage(times, values, satisfiedLength);
+ if (isAligned) {
+ pageIndex2ChunkData
+ .computeIfAbsent(pageIndex, o -> new ArrayList<>())
+ .add((AlignedChunkData) chunkData);
+ } else {
+ consumeChunkData(measurementId, chunkOffset, chunkData);
}
- tsFileDataList.add(chunkData);
+ timePartitionSlot =
TimePartitionUtils.getTimePartitionForRouting(times[i]);
+ satisfiedLength = 0;
+ endTime =
+ timePartitionSlot.getStartTime()
+ +
TimePartitionUtils.getTimePartitionIntervalForRouting();
chunkData =
- ChunkData.createChunkData(
- isAligned, pageOffset, curDevice, header); // open
a new chunk data
- chunkData.setTimePartitionSlot(currentTimePartitionSlot);
- chunkData.setHeadPageNeedDecode(true);
- timePartitionSlot = currentTimePartitionSlot;
+ ChunkData.createChunkData(isAligned, curDevice,
header, timePartitionSlot);
}
- isFirstData = false;
+ satisfiedLength += 1;
}
- chunkData.addDataSize(pageDataSize);
+ chunkData.writeDecodePage(times, values, satisfiedLength);
if (isAligned) {
pageIndex2ChunkData
.computeIfAbsent(pageIndex, o -> new ArrayList<>())
@@ -350,7 +219,9 @@ public class LoadSingleTsFileNode extends WritePlanNode {
dataSize -= pageDataSize;
}
- tsFileDataList.add(chunkData);
+ if (!isAligned) {
+ consumeChunkData(measurementId, chunkOffset, chunkData);
+ }
break;
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
@@ -358,37 +229,48 @@ public class LoadSingleTsFileNode extends WritePlanNode {
chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
header = reader.readChunkHeader(marker);
if (header.getDataSize() == 0) {
- handleEmptyValueChunk(chunkOffset, header, chunkMetadata,
pageIndex2ChunkData);
+ handleEmptyValueChunk(header, pageIndex2ChunkData);
break;
}
- Set<ChunkData> allChunkData = new HashSet<>();
if (!isTimeChunkNeedDecode) {
AlignedChunkData alignedChunkData =
pageIndex2ChunkData.get(1).get(0);
- alignedChunkData.addValueChunk(chunkOffset, header,
chunkMetadata);
- alignedChunkData.addValueChunkDataSize(header.getDataSize());
- reader.position(reader.position() + header.getDataSize());
+ alignedChunkData.addValueChunk(header);
+ alignedChunkData.writeEntireChunk(
+ reader.readChunk(-1, header.getDataSize()), chunkMetadata);
break;
}
+ Set<ChunkData> allChunkData = new HashSet<>();
dataSize = header.getDataSize();
pageIndex = 0;
+ valueDecoder = Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
while (dataSize > 0) {
- long pageOffset = reader.position();
PageHeader pageHeader =
reader.readPageHeader(
header.getDataType(),
(header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
long pageDataSize = pageHeader.getSerializedPageSize();
- for (AlignedChunkData alignedChunkData :
pageIndex2ChunkData.get(pageIndex)) {
+ List<AlignedChunkData> alignedChunkDataList =
pageIndex2ChunkData.get(pageIndex);
+ for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
if (!allChunkData.contains(alignedChunkData)) {
- alignedChunkData.addValueChunk(pageOffset, header,
chunkMetadata);
+ alignedChunkData.addValueChunk(header);
allChunkData.add(alignedChunkData);
}
- alignedChunkData.addValueChunkDataSize(pageDataSize);
}
- reader.position(pageOffset + pageDataSize);
+ if (alignedChunkDataList.size() == 1) { // write entire page
+ alignedChunkDataList
+ .get(0)
+ .writeEntirePage(pageHeader,
reader.readCompressedPage(pageHeader));
+ } else { // decode page
+ long[] times = pageIndex2Times.get(pageIndex);
+ TsPrimitiveType[] values =
+ decodeValuePage(reader, header, pageHeader, times,
valueDecoder);
+ for (AlignedChunkData alignedChunkData : alignedChunkDataList)
{
+ alignedChunkData.writeDecodeValuePage(times, values,
header.getDataType());
+ }
+ }
pageIndex += 1;
dataSize -= pageDataSize;
@@ -406,27 +288,9 @@ public class LoadSingleTsFileNode extends WritePlanNode {
}
}
- handleModification(offset2Deletions, tsFileDataList, Long.MAX_VALUE);
+ consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData);
+ handleModification(offset2Deletions, Long.MAX_VALUE);
}
-
- for (TsFileData tsFileData : tsFileDataList) {
- if (!tsFileData.isModification()) {
- ChunkData chunkData = (ChunkData) tsFileData;
- getPieceNode(chunkData.getDevice(), chunkData.getTimePartitionSlot(),
dataPartition)
- .addTsFileData(chunkData);
- } else {
- for (Map.Entry<TRegionReplicaSet, List<LoadTsFilePieceNode>> entry :
- replicaSet2Pieces.entrySet()) {
- LoadTsFilePieceNode pieceNode =
entry.getValue().get(entry.getValue().size() - 1);
- pieceNode.addTsFileData(tsFileData);
- }
- }
- }
-
- logger.info(
- String.format(
- "Finish Parsing TsFile %s, split to %d pieces, send to %d
RegionReplicaSet.",
- tsFile.getPath(), tsFileDataList.size(),
replicaSet2Pieces.keySet().size()));
}
private void getAllModification(Map<Long, List<Deletion>> offset2Deletions)
throws IOException {
@@ -474,14 +338,42 @@ public class LoadSingleTsFileNode extends WritePlanNode {
}
private void handleModification(
- TreeMap<Long, List<Deletion>> offset2Deletions,
- List<TsFileData> tsFileDataList,
- long chunkOffset) {
+ TreeMap<Long, List<Deletion>> offset2Deletions, long chunkOffset) {
while (!offset2Deletions.isEmpty() &&
offset2Deletions.firstEntry().getKey() <= chunkOffset) {
- tsFileDataList.addAll(
- offset2Deletions.pollFirstEntry().getValue().stream()
- .map(DeletionData::new)
- .collect(Collectors.toList()));
+ offset2Deletions
+ .pollFirstEntry()
+ .getValue()
+ .forEach(o -> consumer.apply(new DeletionData(o)));
+ }
+ }
+
+ private void consumeAllAlignedChunkData(
+ long offset, Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) {
+ if (pageIndex2ChunkData.isEmpty()) {
+ return;
+ }
+
+ Set<ChunkData> allChunkData = new HashSet<>();
+ for (Map.Entry<Integer, List<AlignedChunkData>> entry :
pageIndex2ChunkData.entrySet()) {
+ allChunkData.addAll(entry.getValue());
+ }
+ for (ChunkData chunkData : allChunkData) {
+ if (!consumer.apply(chunkData)) {
+ throw new IllegalStateException(
+ String.format(
+ "Consume aligned chunk data error, next chunk offset: %d,
chunkData: %s",
+ offset, chunkData));
+ }
+ }
+ pageIndex2ChunkData.clear();
+ }
+
+ private void consumeChunkData(String measurement, long offset, ChunkData
chunkData) {
+ if (!consumer.apply(chunkData)) {
+ throw new IllegalStateException(
+ String.format(
+ "Consume chunkData error, chunk offset: %d, measurement: %s,
chunkData: %s",
+ offset, measurement, chunkData));
}
}
@@ -499,7 +391,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
.equals(TimePartitionUtils.getTimePartitionForRouting(pageHeader.getEndTime()));
}
- private long[] decodePage(
+ private Pair<long[], Object[]> decodePage(
boolean isAligned,
ByteBuffer pageData,
PageHeader pageHeader,
@@ -509,57 +401,54 @@ public class LoadSingleTsFileNode extends WritePlanNode {
throws IOException {
if (isAligned) {
TimePageReader timePageReader = new TimePageReader(pageHeader, pageData,
timeDecoder);
- return timePageReader.getNextTimeBatch();
+ long[] times = timePageReader.getNextTimeBatch();
+ return new Pair<>(times, new Object[times.length]);
}
valueDecoder.reset();
PageReader pageReader =
new PageReader(pageData, chunkHeader.getDataType(), valueDecoder,
timeDecoder, null);
BatchData batchData = pageReader.getAllSatisfiedPageData();
- long[] timeBatch = new long[batchData.length()];
+ long[] times = new long[batchData.length()];
+ Object[] values = new Object[batchData.length()];
int index = 0;
while (batchData.hasCurrent()) {
- timeBatch[index++] = batchData.currentTime();
+ times[index] = batchData.currentTime();
+ values[index++] = batchData.currentValue();
batchData.next();
}
- return timeBatch;
+ return new Pair<>(times, values);
}
private void handleEmptyValueChunk(
- long chunkOffset,
- ChunkHeader header,
- IChunkMetadata chunkMetadata,
- Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) {
+ ChunkHeader header, Map<Integer, List<AlignedChunkData>>
pageIndex2ChunkData) {
Set<ChunkData> allChunkData = new HashSet<>();
for (Map.Entry<Integer, List<AlignedChunkData>> entry :
pageIndex2ChunkData.entrySet()) {
for (AlignedChunkData alignedChunkData : entry.getValue()) {
if (!allChunkData.contains(alignedChunkData)) {
- alignedChunkData.addValueChunk(chunkOffset, header, chunkMetadata);
+ alignedChunkData.addValueChunk(header);
allChunkData.add(alignedChunkData);
}
}
}
}
- private LoadTsFilePieceNode getPieceNode(
- String device, TTimePartitionSlot timePartitionSlot, DataPartition
dataPartition) {
- TRegionReplicaSet replicaSet =
- dataPartition.getDataRegionReplicaSetForWriting(device,
timePartitionSlot);
- List<LoadTsFilePieceNode> pieceNodes =
- replicaSet2Pieces.computeIfAbsent(replicaSet, o -> new ArrayList<>());
- if (pieceNodes.isEmpty() || pieceNodes.get(pieceNodes.size() -
1).exceedSize()) {
- pieceNodes.add(new LoadTsFilePieceNode(getPlanNodeId(), tsFile));
+ private TsPrimitiveType[] decodeValuePage(
+ TsFileSequenceReader reader,
+ ChunkHeader chunkHeader,
+ PageHeader pageHeader,
+ long[] times,
+ Decoder valueDecoder)
+ throws IOException {
+ if (pageHeader.getSerializedPageSize() == 0) {
+ return new TsPrimitiveType[times.length];
}
- return pieceNodes.get(pieceNodes.size() - 1);
- }
- public void clean() {
- try {
- if (deleteAfterLoad) {
- Files.deleteIfExists(tsFile.toPath());
- }
- } catch (IOException e) {
- logger.warn(String.format("Delete After Loading %s error.", tsFile), e);
- }
+ valueDecoder.reset();
+ ByteBuffer pageData = reader.readPage(pageHeader,
chunkHeader.getCompressionType());
+ ValuePageReader valuePageReader =
+ new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(),
valueDecoder);
+ return valuePageReader.nextValueBatch(
+ times); // should be origin time, so recording satisfied length is
necessary
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index cf6acaa2f9..b6e2c737a9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -110,8 +110,6 @@ import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -1640,14 +1638,14 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList)
{
TSDataType dataType = timeseriesMetadata.getTSDataType();
if (!dataType.equals(TSDataType.VECTOR)) {
- ChunkHeader chunkHeader =
- getChunkHeaderByTimeseriesMetadata(reader,
timeseriesMetadata);
+ Pair<CompressionType, TSEncoding> pair =
+
reader.readTimeseriesCompressionTypeAndEncoding(timeseriesMetadata);
MeasurementSchema measurementSchema =
new MeasurementSchema(
timeseriesMetadata.getMeasurementId(),
dataType,
- chunkHeader.getEncodingType(),
- chunkHeader.getCompressionType());
+ pair.getRight(),
+ pair.getLeft());
device2Schemas
.computeIfAbsent(device, o -> new HashMap<>())
.put(measurementSchema, tsFile);
@@ -1697,13 +1695,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- private ChunkHeader getChunkHeaderByTimeseriesMetadata(
- TsFileSequenceReader reader, TimeseriesMetadata timeseriesMetadata)
throws IOException {
- IChunkMetadata chunkMetadata =
timeseriesMetadata.getChunkMetadataList().get(0);
- reader.position(chunkMetadata.getOffsetOfChunkHeader());
- return reader.readChunkHeader(reader.readMarker());
- }
-
private void autoCreateSg(int sgLevel, Map<String, Map<MeasurementSchema,
File>> device2Schemas)
throws VerifyMetadataException, LoadFileException, IllegalPathException {
sgLevel += 1; // e.g. "root.sg" means sgLevel = 1, "root.sg.test" means
sgLevel=2
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 03bd5f3bae..04da130949 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -25,12 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.load.AlignedChunkData;
-import org.apache.iotdb.db.engine.load.ChunkData;
-import org.apache.iotdb.db.engine.load.DeletionData;
-import org.apache.iotdb.db.engine.load.TsFileData;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -38,24 +32,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.utils.TimePartitionUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,49 +42,55 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
public class LoadSingleTsFileNode extends WritePlanNode {
private static final Logger logger =
LoggerFactory.getLogger(LoadSingleTsFileNode.class);
private File tsFile;
+ private TsFileResource resource;
private boolean needDecodeTsFile;
+ private boolean deleteAfterLoad;
- private Map<TRegionReplicaSet, List<LoadTsFilePieceNode>> replicaSet2Pieces;
-
- private TsFileResource resource;
private TRegionReplicaSet localRegionReplicaSet;
-
- private boolean deleteAfterLoad;
+ private DataPartition dataPartition;
public LoadSingleTsFileNode(PlanNodeId id) {
super(id);
}
- public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource, boolean
deleteAfterLoad) {
+ public LoadSingleTsFileNode(
+ PlanNodeId id,
+ TsFileResource resource,
+ boolean deleteAfterLoad,
+ DataPartition dataPartition) {
super(id);
this.tsFile = resource.getTsFile();
this.resource = resource;
this.deleteAfterLoad = deleteAfterLoad;
+ this.dataPartition = dataPartition;
}
- public void checkIfNeedDecodeTsFile(DataPartition dataPartition) throws
IOException {
+ public void checkIfNeedDecodeTsFile() throws IOException {
Set<TRegionReplicaSet> allRegionReplicaSet = new HashSet<>();
+ TTimePartitionSlot timePartitionSlot = null;
needDecodeTsFile = false;
for (String device : resource.getDevices()) {
- if
(!TimePartitionUtils.getTimePartitionForRouting(resource.getStartTime(device))
-
.equals(TimePartitionUtils.getTimePartitionForRouting(resource.getEndTime(device))))
{
+ TTimePartitionSlot startSlot =
+
TimePartitionUtils.getTimePartitionForRouting(resource.getStartTime(device));
+ if (timePartitionSlot == null) {
+ timePartitionSlot = startSlot;
+ }
+ if (!startSlot.equals(timePartitionSlot)
+ ||
!TimePartitionUtils.getTimePartitionForRouting(resource.getEndTime(device))
+ .equals(timePartitionSlot)) {
needDecodeTsFile = true;
return;
}
-
allRegionReplicaSet.addAll(dataPartition.getAllDataRegionReplicaSetForOneDevice(device));
+ allRegionReplicaSet.add(
+ dataPartition.getDataRegionReplicaSetForWriting(device,
timePartitionSlot));
}
needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet);
if (!needDecodeTsFile && !resource.resourceFileExists()) {
@@ -152,12 +135,12 @@ public class LoadSingleTsFileNode extends WritePlanNode {
return localRegionReplicaSet;
}
- public TsFileResource getTsFileResource() {
- return resource;
+ public DataPartition getDataPartition() {
+ return dataPartition;
}
- public Map<TRegionReplicaSet, List<LoadTsFilePieceNode>>
getReplicaSet2Pieces() {
- return replicaSet2Pieces;
+ public TsFileResource getTsFileResource() {
+ return resource;
}
@Override
@@ -209,354 +192,14 @@ public class LoadSingleTsFileNode extends WritePlanNode {
+ '}';
}
- public void splitTsFileByDataPartition(DataPartition dataPartition) throws
IOException {
- replicaSet2Pieces = new HashMap<>();
- List<TsFileData> tsFileDataList = new ArrayList<>();
-
- try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- TreeMap<Long, List<Deletion>> offset2Deletions = new TreeMap<>();
- getAllModification(offset2Deletions);
-
- if (!checkMagic(reader)) {
- throw new TsFileRuntimeException(
- String.format("Magic String check error when parsing TsFile %s.",
tsFile.getPath()));
- }
-
- reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
- String curDevice = null;
- boolean isTimeChunkNeedDecode = true;
- Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = null;
- Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
- getChunkMetadata(reader, offset2ChunkMetadata);
- byte marker;
- while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
- switch (marker) {
- case MetaMarker.CHUNK_HEADER:
- case MetaMarker.TIME_CHUNK_HEADER:
- case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
- case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
- long chunkOffset = reader.position();
- handleModification(offset2Deletions, tsFileDataList, chunkOffset);
-
- ChunkHeader header = reader.readChunkHeader(marker);
- if (header.getDataSize() == 0) {
- throw new TsFileRuntimeException(
- String.format("Chunk data error when parsing TsFile %s.",
tsFile.getPath()));
- }
-
- boolean isAligned =
- ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
- == TsFileConstant.TIME_COLUMN_MASK);
- IChunkMetadata chunkMetadata =
offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
- TTimePartitionSlot timePartitionSlot =
-
TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime());
- ChunkData chunkData =
- ChunkData.createChunkData(isAligned, reader.position(),
curDevice, header);
- chunkData.setTimePartitionSlot(timePartitionSlot);
- if (!needDecodeChunk(chunkMetadata)) {
- if (isAligned) {
- isTimeChunkNeedDecode = false;
- pageIndex2ChunkData = new HashMap<>();
- pageIndex2ChunkData
- .computeIfAbsent(1, o -> new ArrayList<>())
- .add((AlignedChunkData) chunkData);
- }
- chunkData.setNotDecode(chunkMetadata);
- chunkData.addDataSize(header.getDataSize());
- tsFileDataList.add(chunkData);
- reader.position(reader.position() + header.getDataSize());
- break;
- }
- if (isAligned) {
- isTimeChunkNeedDecode = true;
- pageIndex2ChunkData = new HashMap<>();
- }
-
- Decoder defaultTimeDecoder =
- Decoder.getDecoderByType(
-
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
- TSDataType.INT64);
- Decoder valueDecoder =
- Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
- int dataSize = header.getDataSize();
- int pageIndex = 0;
- while (dataSize > 0) {
- long pageOffset = reader.position();
- PageHeader pageHeader =
- reader.readPageHeader(
- header.getDataType(),
- (header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
- long pageDataSize = pageHeader.getSerializedPageSize();
- if (!needDecodePage(pageHeader, chunkMetadata)) { // an entire
page
- long startTime =
- pageHeader.getStatistics() == null
- ? chunkMetadata.getStartTime()
- : pageHeader.getStartTime();
- TTimePartitionSlot pageTimePartitionSlot =
- TimePartitionUtils.getTimePartitionForRouting(startTime);
- if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
- tsFileDataList.add(chunkData);
- timePartitionSlot = pageTimePartitionSlot;
- chunkData = ChunkData.createChunkData(isAligned, pageOffset,
curDevice, header);
- chunkData.setTimePartitionSlot(timePartitionSlot);
- }
- if (isAligned) {
- pageIndex2ChunkData
- .computeIfAbsent(pageIndex, o -> new ArrayList<>())
- .add((AlignedChunkData) chunkData);
- }
- chunkData.addDataSize(pageDataSize);
- reader.position(pageOffset + pageDataSize);
- } else { // split page
- ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
- long[] timeBatch =
- decodePage(
- isAligned, pageData, pageHeader, defaultTimeDecoder,
valueDecoder, header);
- boolean isFirstData = true;
- for (long currentTime : timeBatch) {
- TTimePartitionSlot currentTimePartitionSlot =
- TimePartitionUtils.getTimePartitionForRouting(
- currentTime); // TODO: can speed up
- if (!timePartitionSlot.equals(currentTimePartitionSlot)) {
- if (!isFirstData) {
- chunkData.setTailPageNeedDecode(true); // close last
chunk data
- chunkData.addDataSize(pageDataSize);
- if (isAligned) {
- pageIndex2ChunkData
- .computeIfAbsent(pageIndex, o -> new ArrayList<>())
- .add((AlignedChunkData) chunkData);
- }
- }
- tsFileDataList.add(chunkData);
-
- chunkData =
- ChunkData.createChunkData(
- isAligned, pageOffset, curDevice, header); // open
a new chunk data
- chunkData.setTimePartitionSlot(currentTimePartitionSlot);
- chunkData.setHeadPageNeedDecode(true);
- timePartitionSlot = currentTimePartitionSlot;
- }
- isFirstData = false;
- }
- chunkData.addDataSize(pageDataSize);
- if (isAligned) {
- pageIndex2ChunkData
- .computeIfAbsent(pageIndex, o -> new ArrayList<>())
- .add((AlignedChunkData) chunkData);
- }
- }
-
- pageIndex += 1;
- dataSize -= pageDataSize;
- }
-
- tsFileDataList.add(chunkData);
- break;
- case MetaMarker.VALUE_CHUNK_HEADER:
- case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
- chunkOffset = reader.position();
- chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
- header = reader.readChunkHeader(marker);
- if (header.getDataSize() == 0) {
- handleEmptyValueChunk(chunkOffset, header, chunkMetadata,
pageIndex2ChunkData);
- break;
- }
-
- Set<ChunkData> allChunkData = new HashSet<>();
- if (!isTimeChunkNeedDecode) {
- AlignedChunkData alignedChunkData =
pageIndex2ChunkData.get(1).get(0);
- alignedChunkData.addValueChunk(chunkOffset, header,
chunkMetadata);
- alignedChunkData.addValueChunkDataSize(header.getDataSize());
- reader.position(reader.position() + header.getDataSize());
- break;
- }
-
- dataSize = header.getDataSize();
- pageIndex = 0;
-
- while (dataSize > 0) {
- long pageOffset = reader.position();
- PageHeader pageHeader =
- reader.readPageHeader(
- header.getDataType(),
- (header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
- long pageDataSize = pageHeader.getSerializedPageSize();
- for (AlignedChunkData alignedChunkData :
pageIndex2ChunkData.get(pageIndex)) {
- if (!allChunkData.contains(alignedChunkData)) {
- alignedChunkData.addValueChunk(pageOffset, header,
chunkMetadata);
- allChunkData.add(alignedChunkData);
- }
- alignedChunkData.addValueChunkDataSize(pageDataSize);
- }
- reader.position(pageOffset + pageDataSize);
-
- pageIndex += 1;
- dataSize -= pageDataSize;
- }
- break;
- case MetaMarker.CHUNK_GROUP_HEADER:
- ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
- curDevice = chunkGroupHeader.getDeviceID();
- break;
- case MetaMarker.OPERATION_INDEX_RANGE:
- reader.readPlanIndex();
- break;
- default:
- MetaMarker.handleUnexpectedMarker(marker);
- }
- }
-
- handleModification(offset2Deletions, tsFileDataList, Long.MAX_VALUE);
- }
-
- for (TsFileData tsFileData : tsFileDataList) {
- if (!tsFileData.isModification()) {
- ChunkData chunkData = (ChunkData) tsFileData;
- getPieceNode(chunkData.getDevice(), chunkData.getTimePartitionSlot(),
dataPartition)
- .addTsFileData(chunkData);
- } else {
- for (Map.Entry<TRegionReplicaSet, List<LoadTsFilePieceNode>> entry :
- replicaSet2Pieces.entrySet()) {
- LoadTsFilePieceNode pieceNode =
entry.getValue().get(entry.getValue().size() - 1);
- pieceNode.addTsFileData(tsFileData);
- }
- }
- }
-
- logger.info(
- String.format(
- "Finish Parsing TsFile %s, split to %d pieces, send to %d
RegionReplicaSet.",
- tsFile.getPath(), tsFileDataList.size(),
replicaSet2Pieces.keySet().size()));
- }
-
- private void getAllModification(Map<Long, List<Deletion>> offset2Deletions)
throws IOException {
- try (ModificationFile modificationFile =
- new ModificationFile(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX)) {
- for (Modification modification : modificationFile.getModifications()) {
- offset2Deletions
- .computeIfAbsent(modification.getFileOffset(), o -> new
ArrayList<>())
- .add((Deletion) modification);
- }
- }
- }
-
- private boolean checkMagic(TsFileSequenceReader reader) throws IOException {
- String magic = reader.readHeadMagic();
- if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
- logger.error("the file's MAGIC STRING is incorrect, file path: {}",
reader.getFileName());
- return false;
- }
-
- byte versionNumber = reader.readVersionNumber();
- if (versionNumber != TSFileConfig.VERSION_NUMBER) {
- logger.error("the file's Version Number is incorrect, file path: {}",
reader.getFileName());
- return false;
- }
-
- if (!reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
- logger.error("the file is not closed correctly, file path: {}",
reader.getFileName());
- return false;
- }
- return true;
- }
-
- private void getChunkMetadata(
- TsFileSequenceReader reader, Map<Long, IChunkMetadata>
offset2ChunkMetadata)
- throws IOException {
- Map<String, List<TimeseriesMetadata>> device2Metadata =
reader.getAllTimeseriesMetadata(true);
- for (Map.Entry<String, List<TimeseriesMetadata>> entry :
device2Metadata.entrySet()) {
- for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
- for (IChunkMetadata chunkMetadata :
timeseriesMetadata.getChunkMetadataList()) {
- offset2ChunkMetadata.put(chunkMetadata.getOffsetOfChunkHeader(),
chunkMetadata);
- }
- }
- }
- }
-
- private void handleModification(
- TreeMap<Long, List<Deletion>> offset2Deletions,
- List<TsFileData> tsFileDataList,
- long chunkOffset) {
- while (!offset2Deletions.isEmpty() &&
offset2Deletions.firstEntry().getKey() <= chunkOffset) {
- tsFileDataList.addAll(
- offset2Deletions.pollFirstEntry().getValue().stream()
- .map(DeletionData::new)
- .collect(Collectors.toList()));
- }
- }
-
- private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
- return
!TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime())
-
.equals(TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getEndTime()));
- }
-
- private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata
chunkMetadata) {
- if (pageHeader.getStatistics() == null) {
- return
!TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime())
-
.equals(TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getEndTime()));
- }
- return
!TimePartitionUtils.getTimePartitionForRouting(pageHeader.getStartTime())
-
.equals(TimePartitionUtils.getTimePartitionForRouting(pageHeader.getEndTime()));
- }
-
- private long[] decodePage(
- boolean isAligned,
- ByteBuffer pageData,
- PageHeader pageHeader,
- Decoder timeDecoder,
- Decoder valueDecoder,
- ChunkHeader chunkHeader)
- throws IOException {
- if (isAligned) {
- TimePageReader timePageReader = new TimePageReader(pageHeader, pageData,
timeDecoder);
- return timePageReader.getNextTimeBatch();
- }
-
- valueDecoder.reset();
- PageReader pageReader =
- new PageReader(pageData, chunkHeader.getDataType(), valueDecoder,
timeDecoder, null);
- BatchData batchData = pageReader.getAllSatisfiedPageData();
- long[] timeBatch = new long[batchData.length()];
- int index = 0;
- while (batchData.hasCurrent()) {
- timeBatch[index++] = batchData.currentTime();
- batchData.next();
- }
- return timeBatch;
- }
-
- private void handleEmptyValueChunk(
- long chunkOffset,
- ChunkHeader header,
- IChunkMetadata chunkMetadata,
- Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) {
- Set<ChunkData> allChunkData = new HashSet<>();
- for (Map.Entry<Integer, List<AlignedChunkData>> entry :
pageIndex2ChunkData.entrySet()) {
- for (AlignedChunkData alignedChunkData : entry.getValue()) {
- if (!allChunkData.contains(alignedChunkData)) {
- alignedChunkData.addValueChunk(chunkOffset, header, chunkMetadata);
- allChunkData.add(alignedChunkData);
- }
- }
- }
- }
-
- private LoadTsFilePieceNode getPieceNode(
- String device, TTimePartitionSlot timePartitionSlot, DataPartition
dataPartition) {
- TRegionReplicaSet replicaSet =
- dataPartition.getDataRegionReplicaSetForWriting(device,
timePartitionSlot);
- List<LoadTsFilePieceNode> pieceNodes =
- replicaSet2Pieces.computeIfAbsent(replicaSet, o -> new ArrayList<>());
- if (pieceNodes.isEmpty() || pieceNodes.get(pieceNodes.size() -
1).exceedSize()) {
- pieceNodes.add(new LoadTsFilePieceNode(getPlanNodeId(), tsFile));
- }
- return pieceNodes.get(pieceNodes.size() - 1);
- }
-
public void clean() {
try {
if (deleteAfterLoad) {
Files.deleteIfExists(tsFile.toPath());
+ Files.deleteIfExists(
+ new File(tsFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX).toPath());
+ Files.deleteIfExists(
+ new File(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX).toPath());
}
} catch (IOException e) {
logger.warn(String.format("Delete After Loading %s error.", tsFile), e);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
index 0ecb865963..db2a1db1ab 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -92,17 +92,18 @@ public class LoadTsFileNode extends WritePlanNode {
for (TsFileResource resource : resources) {
try {
LoadSingleTsFileNode singleTsFileNode =
- new LoadSingleTsFileNode(getPlanNodeId(), resource,
statement.isDeleteAfterLoad());
-
singleTsFileNode.checkIfNeedDecodeTsFile(analysis.getDataPartitionInfo());
- if (singleTsFileNode.needDecodeTsFile()) {
-
singleTsFileNode.splitTsFileByDataPartition(analysis.getDataPartitionInfo());
- } else {
- logger.info(
- String.format("TsFile %s will be loaded to local.",
resource.getTsFile().getPath()));
- }
+ new LoadSingleTsFileNode(
+ getPlanNodeId(),
+ resource,
+ statement.isDeleteAfterLoad(),
+ analysis.getDataPartitionInfo());
+ singleTsFileNode.checkIfNeedDecodeTsFile();
res.add(singleTsFileNode);
} catch (Exception e) {
- logger.error(String.format("Parse TsFile %s error",
resource.getTsFile().getPath()), e);
+ logger.error(
+ String.format(
+ "Check whether TsFile %s need decode or not error",
resource.getTsFile().getPath()),
+ e);
}
}
return res;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index b0021b6d46..73baf01838 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.load;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.load.TsFileData;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -48,7 +46,6 @@ import java.util.List;
public class LoadTsFilePieceNode extends WritePlanNode {
private static final Logger logger =
LoggerFactory.getLogger(LoadTsFilePieceNode.class);
- private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private File tsFile;
@@ -66,9 +63,8 @@ public class LoadTsFilePieceNode extends WritePlanNode {
this.tsFileDataList = new ArrayList<>();
}
- public boolean exceedSize() {
- return dataSize >= config.getThriftMaxFrameSize() / 2
- || dataSize >= config.getAllocateMemoryForFree() / 2;
+ public long getDataSize() {
+ return dataSize;
}
public void addTsFileData(TsFileData tsFileData) {
@@ -131,11 +127,12 @@ public class LoadTsFilePieceNode extends WritePlanNode {
ReadWriteIOUtils.write(tsFileDataList.size(), stream);
for (TsFileData tsFileData : tsFileDataList) {
try {
- tsFileData.serialize(stream, tsFile);
+ tsFileData.serialize(stream);
} catch (IOException e) {
logger.error(
String.format(
- "Parse page of TsFile %s error, skip chunk %s",
tsFile.getPath(), tsFileData));
+ "Serialize data of TsFile %s error, skip TsFileData %s",
+ tsFile.getPath(), tsFileData));
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index a84a914d6c..3471aa3563 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -112,7 +112,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
if (isDispatchedToLocal(endPoint)) {
dispatchLocally(instance);
} else {
- dispatchRemote(instance, endPoint); // TODO: can read only once
+ dispatchRemote(instance, endPoint);
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index 69be1f1614..2d6be8ad60 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -24,6 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.load.ChunkData;
+import org.apache.iotdb.db.engine.load.TsFileData;
+import org.apache.iotdb.db.engine.load.TsFileSplitter;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -46,6 +51,8 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -56,6 +63,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
/**
* {@link LoadTsFileScheduler} is used for scheduling {@link
LoadSingleTsFileNode} and {@link
@@ -65,15 +73,17 @@ import java.util.concurrent.TimeoutException;
*
href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe">...</a>;
*/
public class LoadTsFileScheduler implements IScheduler {
- public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
-
private static final Logger logger =
LoggerFactory.getLogger(LoadTsFileScheduler.class);
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
+ private static final long MAX_MEMORY_SIZE =
+ Math.min(config.getThriftMaxFrameSize() / 2,
config.getAllocateMemoryForStorageEngine() / 8);
private final MPPQueryContext queryContext;
private final QueryStateMachine stateMachine;
- private LoadTsFileDispatcherImpl dispatcher;
- private List<LoadSingleTsFileNode> tsFileNodeList;
- private PlanFragmentId fragmentId;
+ private final LoadTsFileDispatcherImpl dispatcher;
+ private final List<LoadSingleTsFileNode> tsFileNodeList;
+ private final PlanFragmentId fragmentId;
private Set<TRegionReplicaSet> allReplicaSets;
@@ -124,75 +134,82 @@ public class LoadTsFileScheduler implements IScheduler {
}
private boolean firstPhase(LoadSingleTsFileNode node) {
- if (!dispatchOneTsFile(node)) {
+ try {
+ TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node);
+ new TsFileSplitter(
+ node.getTsFileResource().getTsFile(),
tsFileDataManager::addOrSendTsFileData)
+ .splitTsFileByDataPartition();
+ if (!tsFileDataManager.sendAllTsFileData()) {
+ return false;
+ }
+ } catch (IllegalStateException e) {
+ logger.error(
+ String.format(
+ "Dispatch TsFileData error when parsing TsFile %s.",
+ node.getTsFileResource().getTsFile()),
+ e);
+ return false;
+ } catch (Exception e) {
+ stateMachine.transitionToFailed(e);
logger.error(
- String.format("Dispatch Single TsFile Node error,
LoadSingleTsFileNode %s.", node));
+ String.format("Parse TsFile %s error.",
node.getTsFileResource().getTsFile()), e);
return false;
}
return true;
}
- private boolean dispatchOneTsFile(LoadSingleTsFileNode node) {
- logger.info(
- String.format(
- "Start dispatching TsFile %s",
node.getTsFileResource().getTsFile().getPath()));
- for (Map.Entry<TRegionReplicaSet, List<LoadTsFilePieceNode>> entry :
- node.getReplicaSet2Pieces().entrySet()) {
- allReplicaSets.add(entry.getKey());
- for (LoadTsFilePieceNode pieceNode : entry.getValue()) {
- FragmentInstance instance =
- new FragmentInstance(
- new PlanFragment(fragmentId, pieceNode),
- fragmentId.genFragmentInstanceId(),
- null,
- queryContext.getQueryType(),
- queryContext.getTimeOut(),
- queryContext.getSession());
- instance.setDataRegionAndHost(entry.getKey());
- Future<FragInstanceDispatchResult> dispatchResultFuture =
- dispatcher.dispatch(Collections.singletonList(instance));
-
- try {
- FragInstanceDispatchResult result =
- dispatchResultFuture.get(
- LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
TimeUnit.SECONDS);
- if (!result.isSuccessful()) {
- // TODO: retry.
+ private boolean dispatchOnePieceNode(
+ LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
+ allReplicaSets.add(replicaSet);
+ FragmentInstance instance =
+ new FragmentInstance(
+ new PlanFragment(fragmentId, pieceNode),
+ fragmentId.genFragmentInstanceId(),
+ null,
+ queryContext.getQueryType(),
+ queryContext.getTimeOut(),
+ queryContext.getSession());
+ instance.setDataRegionAndHost(replicaSet);
+ Future<FragInstanceDispatchResult> dispatchResultFuture =
+ dispatcher.dispatch(Collections.singletonList(instance));
+
+ try {
+ FragInstanceDispatchResult result =
+ dispatchResultFuture.get(
+ LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
TimeUnit.SECONDS);
+ if (!result.isSuccessful()) {
+ // TODO: retry.
+ logger.error(
+ String.format(
+ "Dispatch one piece to ReplicaSet %s error, result status code
%s.",
+ replicaSet,
TSStatusCode.representOf(result.getFailureStatus().getCode()).name()));
+ logger.error(
+ String.format("Result status message %s.",
result.getFailureStatus().getMessage()));
+ if (result.getFailureStatus().getSubStatus() != null) {
+ for (TSStatus status : result.getFailureStatus().getSubStatus()) {
logger.error(
String.format(
- "Dispatch one piece to ReplicaSet %s error, result status
code %s.",
- entry.getKey(),
-
TSStatusCode.representOf(result.getFailureStatus().getCode()).name()));
- logger.error(
- String.format("Result status message %s.",
result.getFailureStatus().getMessage()));
- if (result.getFailureStatus().getSubStatus() != null) {
- for (TSStatus status : result.getFailureStatus().getSubStatus())
{
- logger.error(
- String.format(
- "Sub status code %s.",
TSStatusCode.representOf(status.getCode()).name()));
- logger.error(String.format("Sub status message %s.",
status.getMessage()));
- }
- }
- logger.error(String.format("Dispatch piece node:%n%s", pieceNode));
- stateMachine.transitionToFailed(result.getFailureStatus()); //
TODO: record more status
- return false;
- }
- } catch (InterruptedException | ExecutionException |
CancellationException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ "Sub status code %s.",
TSStatusCode.representOf(status.getCode()).name()));
+ logger.error(String.format("Sub status message %s.",
status.getMessage()));
}
- logger.warn("Interrupt or Execution error.", e);
- stateMachine.transitionToFailed(e);
- return false;
- } catch (TimeoutException e) {
- dispatchResultFuture.cancel(true);
- logger.error(
- String.format("Wait for loading %s time out.",
LoadTsFilePieceNode.class.getName()),
- e);
- stateMachine.transitionToFailed(e);
- return false;
}
+ logger.error(String.format("Dispatch piece node error:%n%s",
pieceNode));
+ stateMachine.transitionToFailed(result.getFailureStatus()); // TODO:
record more status
+ return false;
}
+ } catch (InterruptedException | ExecutionException | CancellationException
e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ logger.warn("Interrupt or Execution error.", e);
+ stateMachine.transitionToFailed(e);
+ return false;
+ } catch (TimeoutException e) {
+ dispatchResultFuture.cancel(true);
+ logger.error(
+ String.format("Wait for loading %s time out.",
LoadTsFilePieceNode.class.getName()), e);
+ stateMachine.transitionToFailed(e);
+ return false;
}
return true;
}
@@ -276,4 +293,91 @@ public class LoadTsFileScheduler implements IScheduler {
EXECUTE,
ROLLBACK
}
+
+ private class TsFileDataManager {
+ private final LoadTsFileScheduler scheduler;
+ private final LoadSingleTsFileNode singleTsFileNode;
+
+ private long dataSize;
+ private Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+
+ public TsFileDataManager(LoadTsFileScheduler scheduler,
LoadSingleTsFileNode singleTsFileNode) {
+ this.scheduler = scheduler;
+ this.singleTsFileNode = singleTsFileNode;
+ this.dataSize = 0;
+ this.replicaSet2Piece = new HashMap<>();
+ }
+
+ private boolean addOrSendTsFileData(TsFileData tsFileData) {
+ return tsFileData.isModification()
+ ? addOrSendDeletionData(tsFileData)
+ : addOrSendChunkData((ChunkData) tsFileData);
+ }
+
+ private boolean addOrSendChunkData(ChunkData chunkData) {
+ dataSize += chunkData.getDataSize();
+ if (dataSize > MAX_MEMORY_SIZE) {
+ List<TRegionReplicaSet> sortedReplicaSets =
+ replicaSet2Piece.keySet().stream()
+ .sorted(
+ Comparator.comparingLong(o ->
replicaSet2Piece.get(o).getDataSize()).reversed())
+ .collect(Collectors.toList());
+
+ for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
+ LoadTsFilePieceNode pieceNode =
replicaSet2Piece.get(sortedReplicaSet);
+ if (pieceNode.getDataSize() == 0) { // total data size has been
reduced to 0
+ break;
+ }
+ if (!scheduler.dispatchOnePieceNode(pieceNode, sortedReplicaSet)) {
+ return false;
+ }
+
+ dataSize -= pieceNode.getDataSize();
+ replicaSet2Piece.put(
+ sortedReplicaSet,
+ new LoadTsFilePieceNode(
+ singleTsFileNode.getPlanNodeId(),
+ singleTsFileNode.getTsFileResource().getTsFile()));
+ if (dataSize <= MAX_MEMORY_SIZE) {
+ break;
+ }
+ }
+ }
+ TRegionReplicaSet replicaSet =
+ singleTsFileNode
+ .getDataPartition()
+ .getDataRegionReplicaSetForWriting(
+ chunkData.getDevice(), chunkData.getTimePartitionSlot());
+ replicaSet2Piece
+ .computeIfAbsent(
+ replicaSet,
+ o ->
+ new LoadTsFilePieceNode(
+ singleTsFileNode.getPlanNodeId(),
+ singleTsFileNode.getTsFileResource().getTsFile()))
+ .addTsFileData(chunkData);
+ return true;
+ }
+
+ private boolean addOrSendDeletionData(TsFileData deletionData) {
+ for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
+ dataSize += deletionData.getDataSize();
+ entry.getValue().addTsFileData(deletionData);
+ }
+ return true;
+ }
+
+ private boolean sendAllTsFileData() {
+ for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
+ if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey()))
{
+ logger.error(
+ String.format(
+ "Dispatch piece node %s of TsFile %s error.",
+ entry.getValue(),
singleTsFileNode.getTsFileResource().getTsFile()));
+ return false;
+ }
+ }
+ return true;
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index ee3ec2a058..bfd6a0bf80 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -31,6 +31,8 @@ import
org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader;
import
org.apache.iotdb.db.query.reader.chunk.metadata.MemAlignedChunkMetadataLoader;
import org.apache.iotdb.db.query.reader.chunk.metadata.MemChunkMetadataLoader;
import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -39,6 +41,7 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
import java.util.ArrayList;
@@ -86,6 +89,27 @@ public class FileLoaderUtils {
}
}
+ /**
+ * Generate {@link TsFileResource} from a closed {@link TsFileIOWriter}.
Notice that the writer
+ * should have executed {@link TsFileIOWriter#endFile()}. And this method
will not record plan
+ * Index of this writer.
+ *
+ * @param writer a {@link TsFileIOWriter}
+ * @return a updated {@link TsFileResource}
+ */
+ public static TsFileResource generateTsFileResource(TsFileIOWriter writer) {
+ TsFileResource resource = new TsFileResource(writer.getFile());
+ for (ChunkGroupMetadata chunkGroupMetadata :
writer.getChunkGroupMetadataList()) {
+ String device = chunkGroupMetadata.getDevice();
+ for (ChunkMetadata chunkMetadata :
chunkGroupMetadata.getChunkMetadataList()) {
+ resource.updateStartTime(device, chunkMetadata.getStartTime());
+ resource.updateEndTime(device, chunkMetadata.getEndTime());
+ }
+ }
+ resource.setStatus(TsFileResourceStatus.CLOSED);
+ return resource;
+ }
+
/**
* @param resource TsFile
* @param seriesPath Timeseries path
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
index f39de5ba04..412e419c6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
@@ -34,6 +34,10 @@ public class TimePartitionUtils {
return timePartitionSlot;
}
+ public static long getTimePartitionIntervalForRouting() {
+ return timePartitionIntervalForRouting;
+ }
+
@TestOnly
public static void setTimePartitionIntervalForRouting(long
timePartitionIntervalForRouting) {
TimePartitionUtils.timePartitionIntervalForRouting =
timePartitionIntervalForRouting;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 24a4418702..151e170495 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.tsfile.file.header;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -191,6 +194,24 @@ public class ChunkHeader {
chunkType, measurementID, dataSize, chunkHeaderSize, dataType, type,
encoding);
}
+ /**
+ * Used by {@link
+ *
TsFileSequenceReader#readTimeseriesCompressionTypeAndEncoding(TimeseriesMetadata)}
to only
+ * decode data size, {@link CompressionType} and {@link TSEncoding}.
+ *
+ * @param inputStream
+ * @return
+ * @throws IOException
+ */
+ public static Pair<CompressionType, TSEncoding>
deserializeCompressionTypeAndEncoding(
+ InputStream inputStream) throws IOException {
+ ReadWriteForEncodingUtils.readUnsignedVarInt(inputStream);
+ inputStream.skip(Byte.BYTES); // skip Data type
+ CompressionType type = ReadWriteIOUtils.readCompressionType(inputStream);
+ TSEncoding encoding = ReadWriteIOUtils.readEncoding(inputStream);
+ return new Pair<>(type, encoding);
+ }
+
public int getSerializedSize() {
return serializedSize;
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 8543f1fc8d..e713f5c9cb 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -56,6 +56,7 @@ import
org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -1173,6 +1174,27 @@ public class TsFileSequenceReader implements
AutoCloseable {
header, buffer, chunkCacheKey.getDeleteIntervalList(),
chunkCacheKey.getStatistics());
}
+ /**
+ * read the {@link CompressionType} and {@link TSEncoding} of a timeseries.
This method will skip
+ * the measurement id, and data type. This method will change the position
of this reader.
+ *
+ * @param timeseriesMetadata timeseries' metadata
+ * @return a pair of {@link CompressionType} and {@link TSEncoding} of given
timeseries.
+ * @throws IOException
+ */
+ public Pair<CompressionType, TSEncoding>
readTimeseriesCompressionTypeAndEncoding(
+ TimeseriesMetadata timeseriesMetadata) throws IOException {
+
+ String measurementId = timeseriesMetadata.getMeasurementId();
+ int measurementIdLength =
measurementId.getBytes(TSFileConfig.STRING_CHARSET).length;
+ position(
+
timeseriesMetadata.getChunkMetadataList().get(0).getOffsetOfChunkHeader()
+ + Byte.BYTES // chunkType
+ + ReadWriteForEncodingUtils.varIntSize(measurementIdLength) //
measurementID length
+ + measurementIdLength); // measurementID
+ return
ChunkHeader.deserializeCompressionTypeAndEncoding(tsFileInput.wrapAsInputStream());
+ }
+
/** Get measurement schema by chunkMetadatas. */
public MeasurementSchema getMeasurementSchema(List<IChunkMetadata>
chunkMetadataList)
throws IOException {
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 7cb1868e98..2be174a913 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -686,6 +686,10 @@ public class TsFileIOWriter implements AutoCloseable {
return currentChunkGroupDeviceId;
}
+ public List<ChunkGroupMetadata> getChunkGroupMetadataList() {
+ return chunkGroupMetadataList;
+ }
+
public void flush() throws IOException {
out.flush();
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterEndFileTest.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterEndFileTest.java
new file mode 100644
index 0000000000..1e8fd25081
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterEndFileTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.File;
+
+public class TsFileIOWriterEndFileTest {
+ public static void main(String[] args) throws Exception {
+ TsFileIOWriter writer = new TsFileIOWriter(new File("test.tsfile"));
+ for (int deviceIndex = 0; deviceIndex < 1000; deviceIndex++) {
+ writer.startChunkGroup("root.sg.d" + deviceIndex);
+ for (int seriesIndex = 0; seriesIndex < 1000; seriesIndex++) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(
+ new MeasurementSchema(
+ "s" + seriesIndex, TSDataType.INT32, TSEncoding.RLE,
CompressionType.GZIP));
+ for (long time = 0; time < 10; ++time) {
+ chunkWriter.write(time, 0);
+ }
+ chunkWriter.writeToFileWriter(writer);
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+}