This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch split_text_chunk
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/split_text_chunk by this push:
     new 3a994a3c054 dev aligned chunk split
3a994a3c054 is described below

commit 3a994a3c054cfa6227afa19f934c21e2c08f6574
Author: HTHou <[email protected]>
AuthorDate: Sun Sep 29 10:24:07 2024 +0800

    dev aligned chunk split
---
 .../memtable/AlignedWritableMemChunk.java          | 222 ++++++++++++---------
 1 file changed, 128 insertions(+), 94 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 33a7d09213a..040cc6c7a95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
@@ -46,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class AlignedWritableMemChunk implements IWritableMemChunk {
 
@@ -55,6 +57,8 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
 
   private static final int MAX_NUMBER_OF_POINTS_IN_PAGE =
       TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+  private static final long MAX_SERIES_POINT_NUMBER =
+      
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
 
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
 
@@ -327,22 +331,37 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
 
   @SuppressWarnings({"squid:S6541", "squid:S3776"})
   @Override
-  public void encode(IChunkWriter chunkWriter) {
-    AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
-
+  public void encode(LinkedBlockingQueue<Object> ioTaskQueue) {
     BitMap rowBitMap = list.getRowBitMap();
     boolean[] timeDuplicateInfo = null;
+
+    // Eg.((0,9,12),(13,15)) means this TVList contains 2 chunks,
+    // chunk 1 contains 2 pages, chunk 2 contains 1 page.
+    List<List<Integer>> chunkRange = new ArrayList<>();
     List<Integer> pageRange = new ArrayList<>();
-    int range = 0;
+
+    int pointNumInPage = 0;
+    int pointNumInChunk = 0;
+
     for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); 
sortedRowIndex++) {
       long time = list.getTime(sortedRowIndex);
-      if (range == 0) {
+      if (pointNumInPage == 0) {
         pageRange.add(sortedRowIndex);
       }
-      range++;
-      if (range == MAX_NUMBER_OF_POINTS_IN_PAGE) {
+      pointNumInPage++;
+      pointNumInChunk++;
+      if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE) {
         pageRange.add(sortedRowIndex);
-        range = 0;
+        pointNumInPage = 0;
+      }
+      if (pointNumInChunk == MAX_SERIES_POINT_NUMBER) {
+        if (pointNumInPage != 0) {
+          pageRange.add(sortedRowIndex);
+          pointNumInPage = 0;
+        }
+        chunkRange.add(pageRange);
+        pageRange = new ArrayList<>();
+        pointNumInChunk = 0;
       }
 
       int nextRowIndex = sortedRowIndex + 1;
@@ -360,21 +379,105 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
       sortedRowIndex = nextRowIndex - 1;
     }
 
-    if (range != 0) {
+    if (pointNumInPage != 0) {
       pageRange.add(list.rowCount() - 1);
     }
+    if (pointNumInChunk != 0) {
+      chunkRange.add(pageRange);
+    }
 
+    handleEncoding(ioTaskQueue, chunkRange, timeDuplicateInfo);
+  }
+
+  private void handleEncoding(
+      LinkedBlockingQueue<Object> ioTaskQueue,
+      List<List<Integer>> chunkRange,
+      boolean[] timeDuplicateInfo) {
+    BitMap rowBitMap = list.getRowBitMap();
     List<TSDataType> dataTypes = list.getTsDataTypes();
     Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new 
Pair[dataTypes.size()];
+    for (List<Integer> pageRange : chunkRange) {
+      AlignedChunkWriterImpl alignedChunkWriter = new 
AlignedChunkWriterImpl(schemaList);
+      for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
+        for (int columnIndex = 0; columnIndex < dataTypes.size(); 
columnIndex++) {
+          // Pair of Time and Index
+          if (Objects.nonNull(timeDuplicateInfo)
+              && lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
+            lastValidPointIndexForTimeDupCheck[columnIndex] = new 
Pair<>(Long.MIN_VALUE, null);
+          }
+          TSDataType tsDataType = dataTypes.get(columnIndex);
+          for (int sortedRowIndex = pageRange.get(pageNum * 2);
+              sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
+              sortedRowIndex++) {
+            // skip empty row
+            if (rowBitMap != null && 
rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) {
+              continue;
+            }
+            // skip time duplicated rows
+            long time = list.getTime(sortedRowIndex);
+            if (Objects.nonNull(timeDuplicateInfo)) {
+              if (!list.isNullValue(list.getValueIndex(sortedRowIndex), 
columnIndex)) {
+                lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
+                lastValidPointIndexForTimeDupCheck[columnIndex].right =
+                    list.getValueIndex(sortedRowIndex);
+              }
+              if (timeDuplicateInfo[sortedRowIndex]) {
+                continue;
+              }
+            }
+
+            // The part of code solves the following problem:
+            // Time: 1,2,2,3
+            // Value: 1,2,null,null
+            // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, 
write(T:1,V:1)
+            // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip 
writing value
+            // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, 
T:2!=air.left:2, write(T:2,V:2)
+            // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, 
T:3!=pair.left:2,
+            // write(T:3,V:null)
+
+            int originRowIndex;
+            if 
(Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
+                && (time == 
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
+              originRowIndex = 
lastValidPointIndexForTimeDupCheck[columnIndex].right;
+            } else {
+              originRowIndex = list.getValueIndex(sortedRowIndex);
+            }
 
-    for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
-      for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) 
{
-        // Pair of Time and Index
-        if (Objects.nonNull(timeDuplicateInfo)
-            && lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
-          lastValidPointIndexForTimeDupCheck[columnIndex] = new 
Pair<>(Long.MIN_VALUE, null);
+            boolean isNull = list.isNullValue(originRowIndex, columnIndex);
+            switch (tsDataType) {
+              case BOOLEAN:
+                alignedChunkWriter.writeByColumn(
+                    time, list.getBooleanByValueIndex(originRowIndex, 
columnIndex), isNull);
+                break;
+              case INT32:
+                alignedChunkWriter.writeByColumn(
+                    time, list.getIntByValueIndex(originRowIndex, 
columnIndex), isNull);
+                break;
+              case INT64:
+                alignedChunkWriter.writeByColumn(
+                    time, list.getLongByValueIndex(originRowIndex, 
columnIndex), isNull);
+                break;
+              case FLOAT:
+                alignedChunkWriter.writeByColumn(
+                    time, list.getFloatByValueIndex(originRowIndex, 
columnIndex), isNull);
+                break;
+              case DOUBLE:
+                alignedChunkWriter.writeByColumn(
+                    time, list.getDoubleByValueIndex(originRowIndex, 
columnIndex), isNull);
+                break;
+              case TEXT:
+                alignedChunkWriter.writeByColumn(
+                    time, list.getBinaryByValueIndex(originRowIndex, 
columnIndex), isNull);
+                break;
+              default:
+                break;
+            }
+          }
+          alignedChunkWriter.nextColumn();
         }
-        TSDataType tsDataType = dataTypes.get(columnIndex);
+
+        long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+        int pointsInPage = 0;
         for (int sortedRowIndex = pageRange.get(pageNum * 2);
             sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
             sortedRowIndex++) {
@@ -382,88 +485,19 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
           if (rowBitMap != null && 
rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) {
             continue;
           }
-          // skip time duplicated rows
-          long time = list.getTime(sortedRowIndex);
-          if (Objects.nonNull(timeDuplicateInfo)) {
-            if (!list.isNullValue(list.getValueIndex(sortedRowIndex), 
columnIndex)) {
-              lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
-              lastValidPointIndexForTimeDupCheck[columnIndex].right =
-                  list.getValueIndex(sortedRowIndex);
-            }
-            if (timeDuplicateInfo[sortedRowIndex]) {
-              continue;
-            }
-          }
-
-          // The part of code solves the following problem:
-          // Time: 1,2,2,3
-          // Value: 1,2,null,null
-          // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, 
write(T:1,V:1)
-          // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing 
value
-          // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, 
T:2!=air.left:2, write(T:2,V:2)
-          // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, 
T:3!=pair.left:2,
-          // write(T:3,V:null)
-
-          int originRowIndex;
-          if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
-              && (time == 
lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
-            originRowIndex = 
lastValidPointIndexForTimeDupCheck[columnIndex].right;
-          } else {
-            originRowIndex = list.getValueIndex(sortedRowIndex);
-          }
-
-          boolean isNull = list.isNullValue(originRowIndex, columnIndex);
-          switch (tsDataType) {
-            case BOOLEAN:
-              alignedChunkWriter.writeByColumn(
-                  time, list.getBooleanByValueIndex(originRowIndex, 
columnIndex), isNull);
-              break;
-            case INT32:
-            case DATE:
-              alignedChunkWriter.writeByColumn(
-                  time, list.getIntByValueIndex(originRowIndex, columnIndex), 
isNull);
-              break;
-            case INT64:
-            case TIMESTAMP:
-              alignedChunkWriter.writeByColumn(
-                  time, list.getLongByValueIndex(originRowIndex, columnIndex), 
isNull);
-              break;
-            case FLOAT:
-              alignedChunkWriter.writeByColumn(
-                  time, list.getFloatByValueIndex(originRowIndex, 
columnIndex), isNull);
-              break;
-            case DOUBLE:
-              alignedChunkWriter.writeByColumn(
-                  time, list.getDoubleByValueIndex(originRowIndex, 
columnIndex), isNull);
-              break;
-            case TEXT:
-            case BLOB:
-            case STRING:
-              alignedChunkWriter.writeByColumn(
-                  time, list.getBinaryByValueIndex(originRowIndex, 
columnIndex), isNull);
-              break;
-            default:
-              break;
+          if (Objects.isNull(timeDuplicateInfo) || 
!timeDuplicateInfo[sortedRowIndex]) {
+            times[pointsInPage++] = list.getTime(sortedRowIndex);
           }
         }
-        alignedChunkWriter.nextColumn();
+        alignedChunkWriter.write(times, pointsInPage, 0);
+        alignedChunkWriter.sealCurrentPage();
+        alignedChunkWriter.clearPageWriter();
       }
-
-      long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
-      int pointsInPage = 0;
-      for (int sortedRowIndex = pageRange.get(pageNum * 2);
-          sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
-          sortedRowIndex++) {
-        // skip empty row
-        if (rowBitMap != null && 
rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) {
-          continue;
-        }
-        if (Objects.isNull(timeDuplicateInfo) || 
!timeDuplicateInfo[sortedRowIndex]) {
-          times[pointsInPage++] = list.getTime(sortedRowIndex);
-        }
+      try {
+        ioTaskQueue.put(alignedChunkWriter);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
       }
-
-      alignedChunkWriter.write(times, pointsInPage, 0);
     }
   }
 

Reply via email to