This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch split_text_chunk
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/split_text_chunk by this push:
new 3a994a3c054 dev aligned chunk split
3a994a3c054 is described below
commit 3a994a3c054cfa6227afa19f934c21e2c08f6574
Author: HTHou <[email protected]>
AuthorDate: Sun Sep 29 10:24:07 2024 +0800
dev aligned chunk split
---
.../memtable/AlignedWritableMemChunk.java | 222 ++++++++++++---------
1 file changed, 128 insertions(+), 94 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 33a7d09213a..040cc6c7a95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
@@ -46,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
public class AlignedWritableMemChunk implements IWritableMemChunk {
@@ -55,6 +57,8 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
private static final int MAX_NUMBER_OF_POINTS_IN_PAGE =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ private static final long MAX_SERIES_POINT_NUMBER =
+
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
@@ -327,22 +331,37 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
@SuppressWarnings({"squid:S6541", "squid:S3776"})
@Override
- public void encode(IChunkWriter chunkWriter) {
- AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
-
+ public void encode(LinkedBlockingQueue<Object> ioTaskQueue) {
BitMap rowBitMap = list.getRowBitMap();
boolean[] timeDuplicateInfo = null;
+
+ // Eg.((0,9,12),(13,15)) means this TVList contains 2 chunks,
+ // chunk 1 contains 2 pages, chunk 2 contains 1 page.
+ List<List<Integer>> chunkRange = new ArrayList<>();
List<Integer> pageRange = new ArrayList<>();
- int range = 0;
+
+ int pointNumInPage = 0;
+ int pointNumInChunk = 0;
+
for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount();
sortedRowIndex++) {
long time = list.getTime(sortedRowIndex);
- if (range == 0) {
+ if (pointNumInPage == 0) {
pageRange.add(sortedRowIndex);
}
- range++;
- if (range == MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ pointNumInPage++;
+ pointNumInChunk++;
+ if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE) {
pageRange.add(sortedRowIndex);
- range = 0;
+ pointNumInPage = 0;
+ }
+ if (pointNumInChunk == MAX_SERIES_POINT_NUMBER) {
+ if (pointNumInPage != 0) {
+ pageRange.add(sortedRowIndex);
+ pointNumInPage = 0;
+ }
+ chunkRange.add(pageRange);
+ pageRange = new ArrayList<>();
+ pointNumInChunk = 0;
}
int nextRowIndex = sortedRowIndex + 1;
@@ -360,21 +379,105 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
sortedRowIndex = nextRowIndex - 1;
}
- if (range != 0) {
+ if (pointNumInPage != 0) {
pageRange.add(list.rowCount() - 1);
}
+ if (pointNumInChunk != 0) {
+ chunkRange.add(pageRange);
+ }
+ handleEncoding(ioTaskQueue, chunkRange, timeDuplicateInfo);
+ }
+
+ private void handleEncoding(
+ LinkedBlockingQueue<Object> ioTaskQueue,
+ List<List<Integer>> chunkRange,
+ boolean[] timeDuplicateInfo) {
+ BitMap rowBitMap = list.getRowBitMap();
List<TSDataType> dataTypes = list.getTsDataTypes();
Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new
Pair[dataTypes.size()];
+ for (List<Integer> pageRange : chunkRange) {
+ AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(schemaList);
+ for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
+ for (int columnIndex = 0; columnIndex < dataTypes.size();
columnIndex++) {
+ // Pair of Time and Index
+ if (Objects.nonNull(timeDuplicateInfo)
+ && lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
+ lastValidPointIndexForTimeDupCheck[columnIndex] = new
Pair<>(Long.MIN_VALUE, null);
+ }
+ TSDataType tsDataType = dataTypes.get(columnIndex);
+ for (int sortedRowIndex = pageRange.get(pageNum * 2);
+ sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
+ sortedRowIndex++) {
+ // skip empty row
+ if (rowBitMap != null &&
rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) {
+ continue;
+ }
+ // skip time duplicated rows
+ long time = list.getTime(sortedRowIndex);
+ if (Objects.nonNull(timeDuplicateInfo)) {
+ if (!list.isNullValue(list.getValueIndex(sortedRowIndex),
columnIndex)) {
+ lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
+ lastValidPointIndexForTimeDupCheck[columnIndex].right =
+ list.getValueIndex(sortedRowIndex);
+ }
+ if (timeDuplicateInfo[sortedRowIndex]) {
+ continue;
+ }
+ }
+
+ // The part of code solves the following problem:
+ // Time: 1,2,2,3
+ // Value: 1,2,null,null
+ // When rowIndex:1, pair(min,null), timeDuplicateInfo:false,
write(T:1,V:1)
+ // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip
writing value
+ // When rowIndex:3, pair(2,2), timeDuplicateInfo:false,
T:2!=air.left:2, write(T:2,V:2)
+ // When rowIndex:4, pair(2,2), timeDuplicateInfo:false,
T:3!=pair.left:2,
+ // write(T:3,V:null)
+
+ int originRowIndex;
+ if
(Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
+ && (time ==
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
+ originRowIndex =
lastValidPointIndexForTimeDupCheck[columnIndex].right;
+ } else {
+ originRowIndex = list.getValueIndex(sortedRowIndex);
+ }
- for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
- for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++)
{
- // Pair of Time and Index
- if (Objects.nonNull(timeDuplicateInfo)
- && lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
- lastValidPointIndexForTimeDupCheck[columnIndex] = new
Pair<>(Long.MIN_VALUE, null);
+ boolean isNull = list.isNullValue(originRowIndex, columnIndex);
+ switch (tsDataType) {
+ case BOOLEAN:
+ alignedChunkWriter.writeByColumn(
+ time, list.getBooleanByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ case INT32:
+ alignedChunkWriter.writeByColumn(
+ time, list.getIntByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ case INT64:
+ alignedChunkWriter.writeByColumn(
+ time, list.getLongByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ case FLOAT:
+ alignedChunkWriter.writeByColumn(
+ time, list.getFloatByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ case DOUBLE:
+ alignedChunkWriter.writeByColumn(
+ time, list.getDoubleByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ case TEXT:
+ alignedChunkWriter.writeByColumn(
+ time, list.getBinaryByValueIndex(originRowIndex,
columnIndex), isNull);
+ break;
+ default:
+ break;
+ }
+ }
+ alignedChunkWriter.nextColumn();
}
- TSDataType tsDataType = dataTypes.get(columnIndex);
+
+ long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+ int pointsInPage = 0;
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
@@ -382,88 +485,19 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
if (rowBitMap != null &&
rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) {
continue;
}
- // skip time duplicated rows
- long time = list.getTime(sortedRowIndex);
- if (Objects.nonNull(timeDuplicateInfo)) {
- if (!list.isNullValue(list.getValueIndex(sortedRowIndex),
columnIndex)) {
- lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
- lastValidPointIndexForTimeDupCheck[columnIndex].right =
- list.getValueIndex(sortedRowIndex);
- }
- if (timeDuplicateInfo[sortedRowIndex]) {
- continue;
- }
- }
-
- // The part of code solves the following problem:
- // Time: 1,2,2,3
- // Value: 1,2,null,null
- // When rowIndex:1, pair(min,null), timeDuplicateInfo:false,
write(T:1,V:1)
- // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing
value
- // When rowIndex:3, pair(2,2), timeDuplicateInfo:false,
T:2!=air.left:2, write(T:2,V:2)
- // When rowIndex:4, pair(2,2), timeDuplicateInfo:false,
T:3!=pair.left:2,
- // write(T:3,V:null)
-
- int originRowIndex;
- if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
- && (time ==
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
- originRowIndex =
lastValidPointIndexForTimeDupCheck[columnIndex].right;
- } else {
- originRowIndex = list.getValueIndex(sortedRowIndex);
- }
-
- boolean isNull = list.isNullValue(originRowIndex, columnIndex);
- switch (tsDataType) {
- case BOOLEAN:
- alignedChunkWriter.writeByColumn(
- time, list.getBooleanByValueIndex(originRowIndex,
columnIndex), isNull);
- break;
- case INT32:
- case DATE:
- alignedChunkWriter.writeByColumn(
- time, list.getIntByValueIndex(originRowIndex, columnIndex),
isNull);
- break;
- case INT64:
- case TIMESTAMP:
- alignedChunkWriter.writeByColumn(
- time, list.getLongByValueIndex(originRowIndex, columnIndex),
isNull);
- break;
- case FLOAT:
- alignedChunkWriter.writeByColumn(
- time, list.getFloatByValueIndex(originRowIndex,
columnIndex), isNull);
- break;
- case DOUBLE:
- alignedChunkWriter.writeByColumn(
- time, list.getDoubleByValueIndex(originRowIndex,
columnIndex), isNull);
- break;
- case TEXT:
- case BLOB:
- case STRING:
- alignedChunkWriter.writeByColumn(
- time, list.getBinaryByValueIndex(originRowIndex,
columnIndex), isNull);
- break;
- default:
- break;
+ if (Objects.isNull(timeDuplicateInfo) ||
!timeDuplicateInfo[sortedRowIndex]) {
+ times[pointsInPage++] = list.getTime(sortedRowIndex);
}
}
- alignedChunkWriter.nextColumn();
+ alignedChunkWriter.write(times, pointsInPage, 0);
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.clearPageWriter();
}
-
- long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
- int pointsInPage = 0;
- for (int sortedRowIndex = pageRange.get(pageNum * 2);
- sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
- sortedRowIndex++) {
- // skip empty row
- if (rowBitMap != null &&
rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) {
- continue;
- }
- if (Objects.isNull(timeDuplicateInfo) ||
!timeDuplicateInfo[sortedRowIndex]) {
- times[pointsInPage++] = list.getTime(sortedRowIndex);
- }
+ try {
+ ioTaskQueue.put(alignedChunkWriter);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
-
- alignedChunkWriter.write(times, pointsInPage, 0);
}
}