This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
new a25aad0ab4 refactor some codes
a25aad0ab4 is described below
commit a25aad0ab4d858432f2a1dae004c7e30a60ee5a3
Author: Liu Xuxin <[email protected]>
AuthorDate: Fri Sep 9 10:15:29 2022 +0800
refactor some codes
---
.../write/writer/MemoryControlTsFileIOWriter.java | 113 +++++++--------------
.../tsfile/write/TsFileIntegrityCheckingTool.java | 30 ++++--
2 files changed, 63 insertions(+), 80 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index 0c171c4419..2cfa93e11b 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.tsfile.write.writer;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
@@ -62,8 +61,6 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
protected long currentChunkMetadataSize = 0L;
protected File chunkMetadataTempFile;
protected LocalTsFileOutput tempOutput;
- // it stores the start address of persisted chunk metadata for per series
- // protected Queue<Long> segmentForPerSeries = new ArrayDeque<>();
protected volatile boolean hasChunkMetadataInDisk = false;
protected String currentSeries = null;
// record the total num of path in order to make bloom filter
@@ -71,7 +68,6 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
Path lastSerializePath = null;
public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
- private static final byte VECTOR_TYPE = 1;
private static final byte NORMAL_TYPE = 2;
public MemoryControlTsFileIOWriter(File file, long maxMetadataSize) throws
IOException {
@@ -86,6 +82,14 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
super.endCurrentChunk();
}
+ /**
+ * Check if the size of chunk metadata in memory is greater than the given
threshold. If so, the
+ * chunk metadata will be written to a temp files. <b>Notice! If you are
writing a aligned device,
+ * you should make sure all data of current writing device has been written
before this method is
+ * called.</b> For not aligned series, there is no such limitation.
+ *
+ * @throws IOException
+ */
public void checkMetadataSizeAndMayFlush() throws IOException {
// This function should be called after all data of an aligned device has
been written
if (currentChunkMetadataSize > maxMetadataSize) {
@@ -98,6 +102,12 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
}
}
+ /**
+ * Sort the chunk metadata by the lexicographical order and the start time
of the chunk, then
+ * flush them to a temp file.
+ *
+ * @throws IOException
+ */
protected void sortAndFlushChunkMetadata() throws IOException {
// group by series
Map<Path, List<IChunkMetadata>> chunkMetadataListMap =
groupChunkMetadataListBySeries();
@@ -106,10 +116,11 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
}
hasChunkMetadataInDisk = true;
// the file structure in temp file will be
- // ChunkType | chunkSize | chunkBuffer
+ // chunkSize | chunkBuffer
for (Map.Entry<Path, List<IChunkMetadata>> entry :
chunkMetadataListMap.entrySet()) {
Path seriesPath = entry.getKey();
if (!seriesPath.equals(lastSerializePath)) {
+ // record the count of path to construct bloom filter later
pathCount++;
}
List<IChunkMetadata> iChunkMetadataList = entry.getValue();
@@ -126,48 +137,7 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
private void writeChunkMetadata(
List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
throws IOException {
- if (iChunkMetadataList.size() == 0) {
- return;
- }
- writeNormalChunkMetadata(iChunkMetadataList, seriesPath, output);
- }
-
- private List<IChunkMetadata> packAlignedChunkMetadata(List<IChunkMetadata>
iChunkMetadataList) {
- IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
- List<IChunkMetadata> currentValueChunk = new ArrayList<>();
- List<IChunkMetadata> alignedChunkMetadata = new ArrayList<>();
- for (int i = 1; i < iChunkMetadataList.size(); ++i) {
- if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
- alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
- currentTimeChunk = iChunkMetadataList.get(i);
- currentValueChunk = new ArrayList<>();
- } else {
- currentValueChunk.add(iChunkMetadataList.get(i));
- }
- }
- if (currentValueChunk.size() > 0) {
- alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
- }
- return alignedChunkMetadata;
- }
-
- private void writeAlignedChunkMetadata(
- List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
- throws IOException {
- for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
- ReadWriteIOUtils.write(VECTOR_TYPE, output);
- PublicBAOS buffer = new PublicBAOS();
- int size = chunkMetadata.serializeWithFullInfo(buffer,
seriesPath.getDevice());
- ReadWriteIOUtils.write(size, output);
- buffer.writeTo(output);
- }
- }
-
- private void writeNormalChunkMetadata(
- List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
- throws IOException {
for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
- ReadWriteIOUtils.write(NORMAL_TYPE, output);
PublicBAOS buffer = new PublicBAOS();
int size = chunkMetadata.serializeWithFullInfo(buffer,
seriesPath.getFullPath());
ReadWriteIOUtils.write(size, output);
@@ -177,19 +147,20 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
@Override
public void endFile() throws IOException {
- if (hasChunkMetadataInDisk) {
- // there is some chunk metadata already been written to the disk
- // first we should flush the remaining chunk metadata in memory to disk
- // then read the persisted chunk metadata from disk
- sortAndFlushChunkMetadata();
- tempOutput.close();
- } else {
- // sort the chunk metadata in memory, construct the index tree
+ if (!hasChunkMetadataInDisk) {
+ // all the chunk metadata is stored in memory
+ // sort the chunk metadata, construct the index tree
// and just close the file
super.endFile();
return;
}
+ // there is some chunk metadata already been written to the disk
+ // first we should flush the remaining chunk metadata in memory to disk
+ // then read the persisted chunk metadata from disk
+ sortAndFlushChunkMetadata();
+ tempOutput.close();
+
// read in the chunk metadata, and construct the index tree
readChunkMetadataAndConstructIndexTree();
@@ -237,7 +208,9 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
// construct the index tree node for the series
Path currentPath = null;
if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
- // remove the last . in the series id
+ // this series is the time column of the aligned device
+ // the full series path will be like "root.sg.d."
+ // we remove the last . in the series id here
currentDevice = currentSeries.substring(0, currentSeries.length() - 1);
} else {
currentPath = new Path(currentSeries, true);
@@ -351,31 +324,30 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
return currentPair != null || this.input.position() < endPosition;
}
- public Pair<String, IChunkMetadata> getNextSeriesNameAndChunkMetadata()
throws IOException {
+ /**
+ * Read in next chunk, return the series full path and the chunk metadata.
+ *
+ * @return
+ * @throws IOException
+ */
+ protected Pair<String, IChunkMetadata> getNextSeriesNameAndChunkMetadata()
throws IOException {
if (input.position() >= endPosition) {
currentPair = null;
return null;
}
- byte type = readNextChunkMetadataType();
int size = readNextChunkMetadataSize();
ByteBuffer chunkBuffer = ByteBuffer.allocate(size);
ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
chunkBuffer.flip();
- if (type == NORMAL_TYPE) {
- ChunkMetadata chunkMetadata = new ChunkMetadata();
- String seriesPath = ChunkMetadata.deserializeWithFullInfo(chunkBuffer,
chunkMetadata);
- currentPair = new Pair<>(seriesPath, chunkMetadata);
- } else {
- AlignedChunkMetadata chunkMetadata = new AlignedChunkMetadata();
- String devicePath =
- AlignedChunkMetadata.deserializeWithFullInfo(chunkBuffer,
chunkMetadata);
- currentPair = new Pair<>(devicePath, chunkMetadata);
- }
+ ChunkMetadata chunkMetadata = new ChunkMetadata();
+ String seriesPath = ChunkMetadata.deserializeWithFullInfo(chunkBuffer,
chunkMetadata);
+ currentPair = new Pair<>(seriesPath, chunkMetadata);
return currentPair;
}
public String getAllChunkMetadataForNextSeries(List<IChunkMetadata>
iChunkMetadataList)
throws IOException {
+ // TODO: read all the chunk metadata of a single series once instead of
reading it iteratively
if (currentPair == null) {
if (!hasNextChunkMetadata()) {
return null;
@@ -400,13 +372,6 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
return currentPair;
}
- private byte readNextChunkMetadataType() throws IOException {
- typeBuffer.clear();
- ReadWriteIOUtils.readAsPossible(input, typeBuffer);
- typeBuffer.flip();
- return ReadWriteIOUtils.readByte(typeBuffer);
- }
-
private int readNextChunkMetadataSize() throws IOException {
sizeBuffer.clear();
ReadWriteIOUtils.readAsPossible(input, sizeBuffer);
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
index 43333a6e02..c97a9a0774 100644
---
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
@@ -63,6 +63,12 @@ import java.util.Map;
public class TsFileIntegrityCheckingTool {
private static Logger LOG =
LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class);
+ /**
+ * This method check the integrity of file by reading it from the start to
the end. It mainly
+ * checks the integrity of the chunks.
+ *
+ * @param filename
+ */
public static void checkIntegrityBySequenceRead(String filename) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
String headMagicString = reader.readHeadMagic();
@@ -141,6 +147,16 @@ public class TsFileIntegrityCheckingTool {
}
}
+ /**
+ * This method checks the integrity of the file by mimicking the process of
the query, which reads
+ * the metadata index tree first, and get the timeseries metadata list and
chunk metadata list.
+ * After that, this method acquires single chunk according to chunk
metadata, then it deserializes
+ * the chunk, and verifies the correctness of the data.
+ *
+ * @param filename File to be check
+ * @param originData The origin data in a map format, Device -> SeriesId ->
List<List<Time,Val>>,
+ * each inner list stands for a chunk.
+ */
public static void checkIntegrityByQuery(
String filename,
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>>
originData) {
@@ -148,6 +164,7 @@ public class TsFileIntegrityCheckingTool {
Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
reader.getAllTimeseriesMetadata(true);
Assert.assertEquals(originData.size(), allTimeseriesMetadata.size());
+ // check each series
for (Map.Entry<String, List<TimeseriesMetadata>> entry :
allTimeseriesMetadata.entrySet()) {
String deviceId = entry.getKey();
List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
@@ -163,6 +180,7 @@ public class TsFileIntegrityCheckingTool {
if (!vectorMode) {
// check integrity of not aligned series
for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList)
{
+ // get its chunk metadata list, and read the chunk
String measurementId = timeseriesMetadata.getMeasurementId();
List<List<Pair<Long, TsPrimitiveType>>> originChunks =
originData.get(deviceId).get(measurementId);
@@ -173,29 +191,27 @@ public class TsFileIntegrityCheckingTool {
Chunk chunk = reader.readMemChunk((ChunkMetadata)
chunkMetadataList.get(i));
ChunkReader chunkReader = new ChunkReader(chunk, null);
List<Pair<Long, TsPrimitiveType>> originValue =
originChunks.get(i);
+ // deserialize the chunk and verify it with origin data
for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
IPointReader pointReader =
chunkReader.nextPageData().getBatchDataIterator();
while (pointReader.hasNextTimeValuePair()) {
TimeValuePair pair = pointReader.nextTimeValuePair();
Assert.assertEquals(
originValue.get(valIdx).left.longValue(),
pair.getTimestamp());
- try {
- Assert.assertEquals(originValue.get(valIdx++).right,
pair.getValue());
- } catch (Throwable e) {
- System.out.println();
- }
+ Assert.assertEquals(originValue.get(valIdx++).right,
pair.getValue());
}
}
}
}
} else {
// check integrity of vector type
- // 1. check the time column
+ // get the timeseries metadata of the time column
TimeseriesMetadata timeColumnMetadata =
timeseriesMetadataList.get(0);
List<IChunkMetadata> timeChunkMetadataList =
timeColumnMetadata.getChunkMetadataList();
timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
for (int i = 1; i < timeseriesMetadataList.size(); ++i) {
+ // traverse each value column
List<IChunkMetadata> valueChunkMetadataList =
timeseriesMetadataList.get(i).getChunkMetadataList();
Assert.assertEquals(timeChunkMetadataList.size(),
valueChunkMetadataList.size());
@@ -206,8 +222,10 @@ public class TsFileIntegrityCheckingTool {
reader.readMemChunk((ChunkMetadata)
timeChunkMetadataList.get(chunkIdx));
Chunk valueChunk =
reader.readMemChunk((ChunkMetadata)
valueChunkMetadataList.get(chunkIdx));
+ // construct an aligned chunk reader using time chunk and value
chunk
IChunkReader chunkReader =
new AlignedChunkReader(timeChunk,
Collections.singletonList(valueChunk), null);
+ // verify the values
List<Pair<Long, TsPrimitiveType>> originValue =
originDataChunks.get(chunkIdx);
for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
IBatchDataIterator pointReader =
chunkReader.nextPageData().getBatchDataIterator();