This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
new eda85f5ad2 temp
eda85f5ad2 is described below
commit eda85f5ad2cb6dbac82e920d8e445d22f60597cc
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Sep 6 15:22:26 2022 +0800
temp
---
.../write/writer/MemoryControlTsFileIOWriter.java | 224 ++++++++-------------
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 16 +-
.../writer/MemoryControlTsFileIOWriterTest.java | 12 +-
3 files changed, 104 insertions(+), 148 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index e84970ced1..371f4d2ae7 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.tsfile.write.writer;
+import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
@@ -29,7 +31,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +40,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -51,33 +50,42 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
protected long currentChunkMetadataSize = 0L;
protected File chunkMetadataTempFile;
protected LocalTsFileOutput tempOutput;
- protected final boolean needSort;
- protected Queue<Long> sortedSegmentPosition = new ArrayDeque<>();
+ protected final boolean autoControl;
+ // it stores the start address of persisted chunk metadata for per series
+ protected Queue<Long> segmentForPerSeries = new ArrayDeque<>();
+ protected String currentSeries = null;
public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
- private static final String SORTING_TEMP_FILE = ".scmt";
private static final byte VECTOR_TYPE = 1;
private static final byte NORMAL_TYPE = 2;
- public MemoryControlTsFileIOWriter(File file, long maxMetadataSize, boolean
needSort)
+ public MemoryControlTsFileIOWriter(File file, long maxMetadataSize, boolean
autoControl)
throws IOException {
super(file);
this.maxMetadataSize = maxMetadataSize;
this.chunkMetadataTempFile = new File(file.getAbsoluteFile() +
CHUNK_METADATA_TEMP_FILE_PREFIX);
- this.needSort = needSort;
+ this.autoControl = autoControl;
}
@Override
public void endCurrentChunk() {
currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
super.endCurrentChunk();
+ if (this.autoControl) {
+ checkMetadataSizeAndMayFlush();
+ }
+ }
+
+ public boolean checkMetadataSizeAndMayFlush() {
if (currentChunkMetadataSize > maxMetadataSize) {
try {
sortAndFlushChunkMetadata();
+ return true;
} catch (IOException e) {
LOG.error("Meets exception when flushing metadata to temp files", e);
}
}
+ return false;
}
protected void sortAndFlushChunkMetadata() throws IOException {
@@ -86,10 +94,10 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
if (tempOutput == null) {
tempOutput = new LocalTsFileOutput(new
FileOutputStream(chunkMetadataTempFile));
}
- sortedSegmentPosition.add(tempOutput.getPosition());
// the file structure in temp file will be
// ChunkType | chunkSize | chunkBuffer
for (Map.Entry<Path, List<IChunkMetadata>> entry :
chunkMetadataListMap.entrySet()) {
+ segmentForPerSeries.add(tempOutput.getPosition());
Path seriesPath = entry.getKey();
List<IChunkMetadata> iChunkMetadataList = entry.getValue();
writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
@@ -103,27 +111,33 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
return;
}
if (iChunkMetadataList.get(0).getDataType() == TSDataType.VECTOR) {
- IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
- List<IChunkMetadata> currentValueChunk = new ArrayList<>();
- List<IChunkMetadata> alignedChunkMetadata = new ArrayList<>();
- for (int i = 1; i < iChunkMetadataList.size(); ++i) {
- if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
- alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
- currentTimeChunk = iChunkMetadataList.get(i);
- currentValueChunk = new ArrayList<>();
- } else {
- currentValueChunk.add(iChunkMetadataList.get(i));
- }
- }
- if (currentValueChunk.size() > 0) {
- alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
- }
+ // pack the TimeChunkMetadata and List<ValueChunkMetadata> into
List<AlignedChunkMetadata>
+ List<IChunkMetadata> alignedChunkMetadata =
packAlignedChunkMetadata(iChunkMetadataList);
writeAlignedChunkMetadata(alignedChunkMetadata, seriesPath, output);
} else {
writeNormalChunkMetadata(iChunkMetadataList, seriesPath, output);
}
}
+ private List<IChunkMetadata> packAlignedChunkMetadata(List<IChunkMetadata>
iChunkMetadataList) {
+ IChunkMetadata currentTimeChunk = iChunkMetadataList.get(0);
+ List<IChunkMetadata> currentValueChunk = new ArrayList<>();
+ List<IChunkMetadata> alignedChunkMetadata = new ArrayList<>();
+ for (int i = 1; i < iChunkMetadataList.size(); ++i) {
+ if (iChunkMetadataList.get(i).getDataType() == TSDataType.VECTOR) {
+ alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
+ currentTimeChunk = iChunkMetadataList.get(i);
+ currentValueChunk = new ArrayList<>();
+ } else {
+ currentValueChunk.add(iChunkMetadataList.get(i));
+ }
+ }
+ if (currentValueChunk.size() > 0) {
+ alignedChunkMetadata.add(new AlignedChunkMetadata(currentTimeChunk,
currentValueChunk));
+ }
+ return alignedChunkMetadata;
+ }
+
private void writeAlignedChunkMetadata(
List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
throws IOException {
@@ -150,112 +164,60 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
@Override
public void endFile() throws IOException {
- if (this.sortedSegmentPosition.size() > 0) {
+ if (this.segmentForPerSeries.size() > 0) {
// there is some chunk metadata already been written to the disk
+ // first we should flush the remaining chunk metadata in memory to disk
+ // then read the persisted chunk metadata from disk
sortAndFlushChunkMetadata();
tempOutput.close();
} else {
- // sort the chunk metadata in memory, and just close the file
+ // sort the chunk metadata in memory, construct the index tree
+ // and just close the file
tempOutput.close();
super.endFile();
return;
}
- if (needSort) {
- externalSort();
- }
-
- // super.endFile();
+ // read in the chunk metadata, and construct the index tree
+ readChunkMetadataAndConstructIndexTree();
}
- protected void externalSort() throws IOException {
- ChunkMetadataComparator comparator = new ChunkMetadataComparator();
- int totalSegmentCount = this.sortedSegmentPosition.size();
- File currentInFile = this.chunkMetadataTempFile;
- File currentOutFile = new File(this.file.getAbsolutePath() +
SORTING_TEMP_FILE);
- LocalTsFileInput inputForWindow1 = null;
- LocalTsFileInput inputForWindow2 = null;
- LocalTsFileOutput output = null;
- while (totalSegmentCount > 1) {
- try {
- inputForWindow1 = new LocalTsFileInput(currentInFile.toPath());
- inputForWindow2 = new LocalTsFileInput(currentInFile.toPath());
- output = new LocalTsFileOutput(new FileOutputStream(currentOutFile));
- totalSegmentCount = 0;
- Queue<Long> newSortedSegmentPosition = new ArrayDeque<>();
- while (sortedSegmentPosition.size() > 0) {
- long startPositionForWindow1 = sortedSegmentPosition.poll();
- if (sortedSegmentPosition.size() == 0) {
- // Just leave it alone, and record the position
- newSortedSegmentPosition.add(startPositionForWindow1);
- continue;
- }
- long startPositionForWindow2 = sortedSegmentPosition.poll();
- ChunkMetadataExternalSortWindow firstWindow =
- new ChunkMetadataExternalSortWindow(
- startPositionForWindow1, startPositionForWindow2,
inputForWindow1);
- ChunkMetadataExternalSortWindow secondWindow =
- new ChunkMetadataExternalSortWindow(
- startPositionForWindow2,
- sortedSegmentPosition.size() > 0
- ? sortedSegmentPosition.element()
- : this.chunkMetadataTempFile.length(),
- inputForWindow2);
- firstWindow.getNextSeriesNameAndChunkMetadata();
- secondWindow.getNextSeriesNameAndChunkMetadata();
- newSortedSegmentPosition.add(output.getPosition());
- while (firstWindow.hasNextChunkMetadata() &&
secondWindow.hasNextChunkMetadata()) {
- Pair<String, IChunkMetadata> pairOfFirstWindow =
- firstWindow.getCurrentSeriesNameAndChunkMetadata();
- Pair<String, IChunkMetadata> pairOfSecondWindow =
- secondWindow.getCurrentSeriesNameAndChunkMetadata();
- Pair<String, IChunkMetadata> pairToWritten = null;
- if (comparator.compare(pairOfFirstWindow, pairOfSecondWindow) < 0)
{
- pairToWritten = pairOfFirstWindow;
- if (firstWindow.hasNextChunkMetadata()) {
- firstWindow.getNextSeriesNameAndChunkMetadata();
- }
- } else {
- pairToWritten = pairOfSecondWindow;
- if (secondWindow.hasNextChunkMetadata()) {
- secondWindow.getNextSeriesNameAndChunkMetadata();
- }
- }
- // serialize the chunk to the output
- if (pairToWritten.right instanceof AlignedChunkMetadata) {
- writeAlignedChunkMetadata(
- Collections.singletonList(pairToWritten.right),
- new Path(pairToWritten.left),
- output);
- } else {
- writeNormalChunkMetadata(
- Collections.singletonList(pairToWritten.right),
- new Path(pairToWritten.left),
- output);
- }
- }
- }
+ private void readChunkMetadataAndConstructIndexTree() throws IOException {
+ tempOutput.close();
+ long metaOffset = out.getPosition();
- output.close();
- inputForWindow1.close();
- inputForWindow2.close();
- FileUtils.delete(currentInFile);
- currentOutFile.renameTo(currentInFile);
- File tempFile = currentOutFile;
- currentOutFile = currentInFile;
- currentInFile = tempFile;
- } finally {
- if (inputForWindow1 != null) {
- inputForWindow1.close();
- }
- if (inputForWindow2 != null) {
- inputForWindow2.close();
- }
- if (output != null) {
- output.close();
- }
- }
+ // serialize the SEPARATOR of MetaData
+ ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+ ChunkMetadataReadIterator iterator =
+ new ChunkMetadataReadIterator(
+ 0,
+ chunkMetadataTempFile.length(),
+ new LocalTsFileInput(chunkMetadataTempFile.toPath()));
+ while (iterator.hasNextChunkMetadata()) {
+ // 1. read in all chunk metadata of one series
+ // 2. construct the timeseries metadata for this series
+ // 3. construct the index tree node for the series
+ // 4. serialize the timeseries metadata to file
+ TimeseriesMetadata timeseriesMetadata = readTimeseriesMetadata(iterator);
+ }
+ }
+
+ private TimeseriesMetadata readTimeseriesMetadata(ChunkMetadataReadIterator
iterator)
+ throws IOException {
+ Pair<String, IChunkMetadata> currentPair = iterator.getCurrentPair();
+ if (currentPair == null) {
+ currentPair = iterator.getNextSeriesNameAndChunkMetadata();
+ }
+ if (!currentPair.left.equals(currentSeries)) {
+ // come to a new series
+ currentSeries = currentPair.left;
+ }
+ List<IChunkMetadata> iChunkMetadataList = new ArrayList<>();
+ while (currentPair != null && currentPair.left.equals(currentSeries)) {
+ iChunkMetadataList.add(currentPair.right);
+ currentPair = iterator.getNextSeriesNameAndChunkMetadata();
}
+ return super.constructOneTimeseriesMetadata(new Path(currentSeries),
iChunkMetadataList, false);
}
@Override
@@ -266,32 +228,16 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
}
}
- protected static class ChunkMetadataComparator
- implements Comparator<Pair<String, IChunkMetadata>> {
-
- @Override
- public int compare(Pair<String, IChunkMetadata> o1, Pair<String,
IChunkMetadata> o2) {
- String seriesNameOfO1 = o1.left;
- String seriesNameOfO2 = o2.left;
- int lexicographicalOrder = seriesNameOfO1.compareTo(seriesNameOfO2);
- if (lexicographicalOrder != 0) {
- return lexicographicalOrder;
- } else {
- return Long.compare(o1.right.getStartTime(), o2.right.getStartTime());
- }
- }
- }
-
- protected class ChunkMetadataExternalSortWindow {
+ protected class ChunkMetadataReadIterator {
final LocalTsFileInput input;
final long startPosition;
final long endPosition;
final ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
final ByteBuffer typeBuffer = ByteBuffer.allocate(1);
- Pair<String, IChunkMetadata> currentPair = null;
+ private Pair<String, IChunkMetadata> currentPair = null;
- ChunkMetadataExternalSortWindow(long startPosition, long endPosition,
LocalTsFileInput input)
+ ChunkMetadataReadIterator(long startPosition, long endPosition,
LocalTsFileInput input)
throws IOException {
this.startPosition = startPosition;
this.endPosition = endPosition;
@@ -304,6 +250,10 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
}
public Pair<String, IChunkMetadata> getNextSeriesNameAndChunkMetadata()
throws IOException {
+ if (input.position() >= endPosition) {
+ currentPair = null;
+ return null;
+ }
byte type = readNextChunkMetadataType();
int size = readNextChunkMetadataSize();
ByteBuffer chunkBuffer = ByteBuffer.allocate(size);
@@ -322,7 +272,7 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
return currentPair;
}
- public Pair<String, IChunkMetadata> getCurrentSeriesNameAndChunkMetadata()
{
+ public Pair<String, IChunkMetadata> getCurrentPair() {
return currentPair;
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 59a8ec236d..c34ab1df63 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -327,7 +327,7 @@ public class TsFileIOWriter implements AutoCloseable {
// create device -> TimeseriesMetaDataList Map
for (Map.Entry<Path, List<IChunkMetadata>> entry :
chunkMetadataListMap.entrySet()) {
// for ordinary path
- flushOneChunkMetadata(entry.getKey(), entry.getValue());
+ constructOneTimeseriesMetadata(entry.getKey(), entry.getValue(), true);
}
// construct TsFileMetadata and return
@@ -339,8 +339,11 @@ public class TsFileIOWriter implements AutoCloseable {
*
* @param path Path of chunk
* @param chunkMetadataList List of chunkMetadata about path(previous param)
+ * @param needRecordInMap need to record the timeseries metadata in
deviceTimeseriesMetadataMap
+ * @return the constructed TimeseriesMetadata
*/
- private void flushOneChunkMetadata(Path path, List<IChunkMetadata>
chunkMetadataList)
+ protected TimeseriesMetadata constructOneTimeseriesMetadata(
+ Path path, List<IChunkMetadata> chunkMetadataList, boolean
needRecordInMap)
throws IOException {
// create TimeseriesMetaData
PublicBAOS publicBAOS = new PublicBAOS();
@@ -367,9 +370,12 @@ public class TsFileIOWriter implements AutoCloseable {
dataType,
seriesStatistics,
publicBAOS);
- deviceTimeseriesMetadataMap
- .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
- .add(timeseriesMetadata);
+ if (needRecordInMap) {
+ deviceTimeseriesMetadataMap
+ .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+ .add(timeseriesMetadata);
+ }
+ return timeseriesMetadata;
}
/**
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
index 25bd192974..392a699222 100644
---
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
@@ -101,9 +101,9 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();
- ChunkMetadataExternalSortWindow window =
+ ChunkMetadataReadIterator window =
writer
- .new ChunkMetadataExternalSortWindow(
+ .new ChunkMetadataReadIterator(
0,
writer.chunkMetadataTempFile.length(),
new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
@@ -157,9 +157,9 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
new AlignedChunkMetadata(currentTimeChunkMetadata,
currentValueChunkMetadata));
}
- ChunkMetadataExternalSortWindow window =
+ ChunkMetadataReadIterator window =
writer
- .new ChunkMetadataExternalSortWindow(
+ .new ChunkMetadataReadIterator(
0,
writer.chunkMetadataTempFile.length(),
new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
@@ -226,9 +226,9 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
writer.sortAndFlushChunkMetadata();
writer.tempOutput.flush();
- ChunkMetadataExternalSortWindow window =
+ ChunkMetadataReadIterator window =
writer
- .new ChunkMetadataExternalSortWindow(
+ .new ChunkMetadataReadIterator(
0,
writer.chunkMetadataTempFile.length(),
new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));