This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f1cbdde37 [IOTDB-4036] Encode aligned memory chunk in columnar format
(#7179)
6f1cbdde37 is described below
commit 6f1cbdde37d6bf07d88efe05fb001b1fa6ec4343
Author: Mrquan <[email protected]>
AuthorDate: Tue Sep 13 16:15:53 2022 +0800
[IOTDB-4036] Encode aligned memory chunk in columnar format (#7179)
---
.../engine/memtable/AlignedWritableMemChunk.java | 160 ++++++++++++++-------
.../tsfile/write/chunk/AlignedChunkWriterImpl.java | 28 ++++
2 files changed, 138 insertions(+), 50 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index dbcbc5c207..c9cbcbb3de 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -43,6 +44,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
public class AlignedWritableMemChunk implements IWritableMemChunk {
@@ -50,6 +52,10 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
private final Map<String, Integer> measurementIndexMap;
private final List<IMeasurementSchema> schemaList;
private AlignedTVList list;
+
+ private static final int maxNumberOfPointsInPage =
+ TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
private static final Logger LOGGER =
LoggerFactory.getLogger(AlignedWritableMemChunk.class);
@@ -295,65 +301,119 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
@Override
public void encode(IChunkWriter chunkWriter) {
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
- List<Integer> timeDuplicateAlignedRowIndexList = null;
+
+ boolean[] timeDuplicateInfo = null;
+ List<Integer> pageRange = new ArrayList<>();
+ int range = 0;
for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount();
sortedRowIndex++) {
long time = list.getTime(sortedRowIndex);
- // skip duplicated data
- if ((sortedRowIndex + 1 < list.rowCount() && (time ==
list.getTime(sortedRowIndex + 1)))) {
- // record the time duplicated row index list for vector type
- if (timeDuplicateAlignedRowIndexList == null) {
- timeDuplicateAlignedRowIndexList = new ArrayList<>();
-
timeDuplicateAlignedRowIndexList.add(list.getValueIndex(sortedRowIndex));
+ if (sortedRowIndex == list.rowCount() - 1 || time !=
list.getTime(sortedRowIndex + 1)) {
+ if (range == 0) {
+ pageRange.add(sortedRowIndex);
+ }
+ range++;
+ if (range == maxNumberOfPointsInPage) {
+ pageRange.add(sortedRowIndex);
+ range = 0;
}
- timeDuplicateAlignedRowIndexList.add(list.getValueIndex(sortedRowIndex
+ 1));
- continue;
+ } else {
+ if (Objects.isNull(timeDuplicateInfo)) {
+ timeDuplicateInfo = new boolean[list.rowCount()];
+ }
+ timeDuplicateInfo[sortedRowIndex] = true;
}
- List<TSDataType> dataTypes = list.getTsDataTypes();
- int originRowIndex = list.getValueIndex(sortedRowIndex);
+ }
+
+ if (range != 0) {
+ pageRange.add(list.rowCount() - 1);
+ }
+
+ List<TSDataType> dataTypes = list.getTsDataTypes();
+ for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++)
{
- // write the time duplicated rows
- if (timeDuplicateAlignedRowIndexList != null
- && !timeDuplicateAlignedRowIndexList.isEmpty()) {
- originRowIndex =
- list.getValidRowIndexForTimeDuplicatedRows(
- timeDuplicateAlignedRowIndexList, columnIndex);
+ // Pair of Time and Index
+ Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
+ if (Objects.nonNull(timeDuplicateInfo)) {
+ lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE,
null);
+ }
+ for (int sortedRowIndex = pageRange.get(pageNum * 2);
+ sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
+ sortedRowIndex++) {
+
+ // skip time duplicated rows
+ long time = list.getTime(sortedRowIndex);
+ if (Objects.nonNull(timeDuplicateInfo)) {
+ if (!list.isNullValue(list.getValueIndex(sortedRowIndex),
columnIndex)) {
+ lastValidPointIndexForTimeDupCheck.left = time;
+ lastValidPointIndexForTimeDupCheck.right =
list.getValueIndex(sortedRowIndex);
+ }
+ if (timeDuplicateInfo[sortedRowIndex]) {
+ continue;
+ }
+ }
+
+ // The part of code solves the following problem:
+ // Time: 1,2,2,3
+ // Value: 1,2,null,null
+ // When rowIndex:1, pair(min,null), timeDuplicateInfo:false,
write(T:1,V:1)
+ // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing
value
+ // When rowIndex:3, pair(2,2), timeDuplicateInfo:false,
T:2!=air.left:2, write(T:2,V:2)
+ // When rowIndex:4, pair(2,2), timeDuplicateInfo:false,
T:3!=pair.left:2,
+ // write(T:3,V:null)
+
+ int originRowIndex;
+ if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
+ && (time == lastValidPointIndexForTimeDupCheck.left)) {
+ originRowIndex = lastValidPointIndexForTimeDupCheck.right;
+ } else {
+ originRowIndex = list.getValueIndex(sortedRowIndex);
+ }
+
+ boolean isNull = list.isNullValue(originRowIndex, columnIndex);
+ switch (dataTypes.get(columnIndex)) {
+ case BOOLEAN:
+ alignedChunkWriter.writeByColumn(
+ time, list.getBooleanByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ case INT32:
+ alignedChunkWriter.writeByColumn(
+ time, list.getIntByValueIndex(originRowIndex, columnIndex),
isNull);
+ break;
+ case INT64:
+ alignedChunkWriter.writeByColumn(
+ time, list.getLongByValueIndex(originRowIndex, columnIndex),
isNull);
+ break;
+ case FLOAT:
+ alignedChunkWriter.writeByColumn(
+ time, list.getFloatByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ case DOUBLE:
+ alignedChunkWriter.writeByColumn(
+ time, list.getDoubleByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ case TEXT:
+ alignedChunkWriter.writeByColumn(
+ time, list.getBinaryByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ default:
+ break;
+ }
}
- boolean isNull = list.isNullValue(originRowIndex, columnIndex);
- switch (dataTypes.get(columnIndex)) {
- case BOOLEAN:
- alignedChunkWriter.write(
- time, list.getBooleanByValueIndex(originRowIndex,
columnIndex), isNull);
- break;
- case INT32:
- alignedChunkWriter.write(
- time, list.getIntByValueIndex(originRowIndex, columnIndex),
isNull);
- break;
- case INT64:
- alignedChunkWriter.write(
- time, list.getLongByValueIndex(originRowIndex, columnIndex),
isNull);
- break;
- case FLOAT:
- alignedChunkWriter.write(
- time, list.getFloatByValueIndex(originRowIndex, columnIndex),
isNull);
- break;
- case DOUBLE:
- alignedChunkWriter.write(
- time, list.getDoubleByValueIndex(originRowIndex, columnIndex),
isNull);
- break;
- case TEXT:
- alignedChunkWriter.write(
- time, list.getBinaryByValueIndex(originRowIndex, columnIndex),
isNull);
- break;
- default:
- LOGGER.error(
- "AlignedWritableMemChunk does not support data type: {}",
- dataTypes.get(columnIndex));
- break;
+ alignedChunkWriter.nextColumn();
+ }
+
+ long[] times = new long[maxNumberOfPointsInPage];
+ int pointsInPage = 0;
+ for (int sortedRowIndex = pageRange.get(pageNum * 2);
+ sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
+ sortedRowIndex++) {
+ if (Objects.isNull(timeDuplicateInfo) ||
!timeDuplicateInfo[sortedRowIndex]) {
+ times[pointsInPage++] = list.getTime(sortedRowIndex);
}
}
- alignedChunkWriter.write(time);
- timeDuplicateAlignedRowIndexList = null;
+
+ alignedChunkWriter.write(times, pointsInPage, 0);
}
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 46d82d3a9f..a1fc973d60 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -219,6 +219,34 @@ public class AlignedChunkWriterImpl implements
IChunkWriter {
remainingPointsNumber =
timeChunkWriter.getRemainingPointNumberForCurrentPage();
}
+ public void writeByColumn(long time, int value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void writeByColumn(long time, long value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void writeByColumn(long time, boolean value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void writeByColumn(long time, float value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void writeByColumn(long time, double value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void writeByColumn(long time, Binary value, boolean isNull) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void nextColumn() {
+ valueIndex++;
+ }
+
/**
* check occupied memory size, if it exceeds the PageSize threshold,
construct a page and put it
* to pageBuffer