This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch aligned_flush in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4207e33a03d0fc146aa044bc12f4c016e1a53c68 Author: HTHou <[email protected]> AuthorDate: Tue Aug 9 17:04:15 2022 +0800 impor --- .../engine/memtable/AlignedWritableMemChunk.java | 18 ++--- .../db/utils/datastructure/AlignedTVList.java | 84 ++++++++++++++++++++++ .../tsfile/write/chunk/AlignedChunkWriterImpl.java | 28 ++++++++ 3 files changed, 121 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java index 5796c87845..b1632c0c7e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java @@ -24,8 +24,6 @@ import org.apache.iotdb.db.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.wal.utils.WALWriteUtils; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.Pair; @@ -297,13 +295,15 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public void encode(IChunkWriter chunkWriter) { AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; - List<TSEncoding> encodingList = new ArrayList<>(); - for (TSDataType e : list.getTsDataTypes()) { - encodingList.add(TSEncoding.PLAIN); - } - TsBlock tsBlock = list.buildTsBlock(0, encodingList, null); - alignedChunkWriter.write( - tsBlock.getTimeColumn(), tsBlock.getValueColumns(), tsBlock.getPositionCount()); + // List<TSEncoding> encodingList = new ArrayList<>(); + // for (TSDataType e : list.getTsDataTypes()) { + // encodingList.add(TSEncoding.PLAIN); + // } + // TsBlock tsBlock = list.buildTsBlock(0, encodingList, null); + // alignedChunkWriter.write( + // tsBlock.getTimeColumn(), tsBlock.getValueColumns(), tsBlock.getPositionCount()); + list.writeAlignedChunk(alignedChunkWriter); + // List<Integer> timeDuplicateAlignedRowIndexList = null; // for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { // long time = list.getTime(sortedRowIndex); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 27c1bf8510..0c0cd78bc7 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -36,6 +36,7 @@ import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; import java.io.DataInputStream; import java.io.IOException; @@ -987,6 +988,89 @@ public class AlignedTVList extends TVList { return builder.build(); } + public void writeAlignedChunk(AlignedChunkWriterImpl chunkWriter) { + // Time column + boolean[] timeDuplicateInfo = null; + // time column + for (int sortedRowIndex = 0; sortedRowIndex < rowCount; sortedRowIndex++) { + long time = getTime(sortedRowIndex); + if (sortedRowIndex == rowCount - 1 || time != getTime(sortedRowIndex + 1)) { + chunkWriter.write(time); + } else { + if (Objects.isNull(timeDuplicateInfo)) { + timeDuplicateInfo = new boolean[rowCount]; + } + timeDuplicateInfo[sortedRowIndex] = true; + } + } + + // value columns + for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) { + // Pair of Time and Index + Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null; + if (Objects.nonNull(timeDuplicateInfo)) { + lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, null); + } + for (int sortedRowIndex = 0; sortedRowIndex < rowCount; sortedRowIndex++) { + // skip time duplicated rows + if (Objects.nonNull(timeDuplicateInfo)) { + if (!isNullValue(getValueIndex(sortedRowIndex), columnIndex)) { + lastValidPointIndexForTimeDupCheck.left = getTime(sortedRowIndex); + lastValidPointIndexForTimeDupCheck.right = getValueIndex(sortedRowIndex); + } + if (timeDuplicateInfo[sortedRowIndex]) { + continue; + } + } + // The part of code solves the following problem: + // Time: 1,2,2,3 + // Value: 1,2,null,null + // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1) + // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value + // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2!=air.left:2, write(T:2,V:2) + // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2, write(T:3,V:null) + int originRowIndex; + long time = getTime(sortedRowIndex); + if (Objects.nonNull(lastValidPointIndexForTimeDupCheck) + && (time == lastValidPointIndexForTimeDupCheck.left)) { + originRowIndex = lastValidPointIndexForTimeDupCheck.right; + } else { + originRowIndex = getValueIndex(sortedRowIndex); + } + boolean isNull = isNullValue(originRowIndex, columnIndex); + switch (dataTypes.get(columnIndex)) { + case BOOLEAN: + chunkWriter.writeByColumn( + time, getBooleanByValueIndex(originRowIndex, columnIndex), isNull); + break; + case INT32: + chunkWriter.writeByColumn( + time, getIntByValueIndex(originRowIndex, columnIndex), isNull); + break; + case INT64: + chunkWriter.writeByColumn( + time, getLongByValueIndex(originRowIndex, columnIndex), isNull); + break; + case FLOAT: + chunkWriter.writeByColumn( + time, getFloatByValueIndex(originRowIndex, columnIndex), isNull); + break; + case DOUBLE: + chunkWriter.writeByColumn( + time, getDoubleByValueIndex(originRowIndex, columnIndex), isNull); + break; + case TEXT: + chunkWriter.writeByColumn( + time, getBinaryByValueIndex(originRowIndex, columnIndex), isNull); + break; + default: + break; + } + } + chunkWriter.nextColumn(); + } + } + protected void writeValidValuesIntoTsBlock( TsBlockBuilder builder, int floatPrecision, diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java index 46d82d3a9f..60df2cc5a6 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java @@ -164,6 +164,34 @@ public class AlignedChunkWriterImpl implements IChunkWriter { } } + public void writeByColumn(long time, int value, boolean isNull) { + valueChunkWriterList.get(valueIndex).write(time, value, isNull); + } + + public void writeByColumn(long time, long value, boolean isNull) { + valueChunkWriterList.get(valueIndex).write(time, value, isNull); + } + + public void writeByColumn(long time, boolean value, boolean isNull) { + valueChunkWriterList.get(valueIndex).write(time, value, isNull); + } + + public void writeByColumn(long time, float value, boolean isNull) { + valueChunkWriterList.get(valueIndex).write(time, value, isNull); + } + + public void writeByColumn(long time, double value, boolean isNull) { + valueChunkWriterList.get(valueIndex).write(time, value, isNull); + } + + public void writeByColumn(long time, Binary value, boolean isNull) { + valueChunkWriterList.get(valueIndex).write(time, value, isNull); + } + + public void nextColumn() { + valueIndex++; + } + public void write(TimeColumn timeColumn, Column[] valueColumns, int batchSize) { if (remainingPointsNumber < batchSize) { int pointsHasWritten = (int) remainingPointsNumber;
