This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f1cbdde37 [IOTDB-4036] Encode aligned memory chunk in columnar format 
(#7179)
6f1cbdde37 is described below

commit 6f1cbdde37d6bf07d88efe05fb001b1fa6ec4343
Author: Mrquan <[email protected]>
AuthorDate: Tue Sep 13 16:15:53 2022 +0800

    [IOTDB-4036] Encode aligned memory chunk in columnar format (#7179)
---
 .../engine/memtable/AlignedWritableMemChunk.java   | 160 ++++++++++++++-------
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |  28 ++++
 2 files changed, 138 insertions(+), 50 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index dbcbc5c207..c9cbcbb3de 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -43,6 +44,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 public class AlignedWritableMemChunk implements IWritableMemChunk {
@@ -50,6 +52,10 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   private final Map<String, Integer> measurementIndexMap;
   private final List<IMeasurementSchema> schemaList;
   private AlignedTVList list;
+
+  private static final int maxNumberOfPointsInPage =
+      TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(AlignedWritableMemChunk.class);
 
@@ -295,65 +301,119 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   @Override
   public void encode(IChunkWriter chunkWriter) {
     AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
-    List<Integer> timeDuplicateAlignedRowIndexList = null;
+
+    boolean[] timeDuplicateInfo = null;
+    List<Integer> pageRange = new ArrayList<>();
+    int range = 0;
     for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); 
sortedRowIndex++) {
       long time = list.getTime(sortedRowIndex);
 
-      // skip duplicated data
-      if ((sortedRowIndex + 1 < list.rowCount() && (time == 
list.getTime(sortedRowIndex + 1)))) {
-        // record the time duplicated row index list for vector type
-        if (timeDuplicateAlignedRowIndexList == null) {
-          timeDuplicateAlignedRowIndexList = new ArrayList<>();
-          
timeDuplicateAlignedRowIndexList.add(list.getValueIndex(sortedRowIndex));
+      if (sortedRowIndex == list.rowCount() - 1 || time != 
list.getTime(sortedRowIndex + 1)) {
+        if (range == 0) {
+          pageRange.add(sortedRowIndex);
+        }
+        range++;
+        if (range == maxNumberOfPointsInPage) {
+          pageRange.add(sortedRowIndex);
+          range = 0;
         }
-        timeDuplicateAlignedRowIndexList.add(list.getValueIndex(sortedRowIndex 
+ 1));
-        continue;
+      } else {
+        if (Objects.isNull(timeDuplicateInfo)) {
+          timeDuplicateInfo = new boolean[list.rowCount()];
+        }
+        timeDuplicateInfo[sortedRowIndex] = true;
       }
-      List<TSDataType> dataTypes = list.getTsDataTypes();
-      int originRowIndex = list.getValueIndex(sortedRowIndex);
+    }
+
+    if (range != 0) {
+      pageRange.add(list.rowCount() - 1);
+    }
+
+    List<TSDataType> dataTypes = list.getTsDataTypes();
+    for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
       for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) 
{
-        // write the time duplicated rows
-        if (timeDuplicateAlignedRowIndexList != null
-            && !timeDuplicateAlignedRowIndexList.isEmpty()) {
-          originRowIndex =
-              list.getValidRowIndexForTimeDuplicatedRows(
-                  timeDuplicateAlignedRowIndexList, columnIndex);
+        // Pair of Time and Index
+        Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
+        if (Objects.nonNull(timeDuplicateInfo)) {
+          lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, 
null);
+        }
+        for (int sortedRowIndex = pageRange.get(pageNum * 2);
+            sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
+            sortedRowIndex++) {
+
+          // skip time duplicated rows
+          long time = list.getTime(sortedRowIndex);
+          if (Objects.nonNull(timeDuplicateInfo)) {
+            if (!list.isNullValue(list.getValueIndex(sortedRowIndex), 
columnIndex)) {
+              lastValidPointIndexForTimeDupCheck.left = time;
+              lastValidPointIndexForTimeDupCheck.right = 
list.getValueIndex(sortedRowIndex);
+            }
+            if (timeDuplicateInfo[sortedRowIndex]) {
+              continue;
+            }
+          }
+
+          // The part of code solves the following problem:
+          // Time: 1,2,2,3
+          // Value: 1,2,null,null
+          // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, 
write(T:1,V:1)
+          // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing 
value
+          // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, 
T:2!=air.left:2, write(T:2,V:2)
+          // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, 
T:3!=pair.left:2,
+          // write(T:3,V:null)
+
+          int originRowIndex;
+          if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
+              && (time == lastValidPointIndexForTimeDupCheck.left)) {
+            originRowIndex = lastValidPointIndexForTimeDupCheck.right;
+          } else {
+            originRowIndex = list.getValueIndex(sortedRowIndex);
+          }
+
+          boolean isNull = list.isNullValue(originRowIndex, columnIndex);
+          switch (dataTypes.get(columnIndex)) {
+            case BOOLEAN:
+              alignedChunkWriter.writeByColumn(
+                  time, list.getBooleanByValueIndex(originRowIndex, 
columnIndex), isNull);
+              break;
+            case INT32:
+              alignedChunkWriter.writeByColumn(
+                  time, list.getIntByValueIndex(originRowIndex, columnIndex), 
isNull);
+              break;
+            case INT64:
+              alignedChunkWriter.writeByColumn(
+                  time, list.getLongByValueIndex(originRowIndex, columnIndex), 
isNull);
+              break;
+            case FLOAT:
+              alignedChunkWriter.writeByColumn(
+                  time, list.getFloatByValueIndex(originRowIndex, 
columnIndex), isNull);
+              break;
+            case DOUBLE:
+              alignedChunkWriter.writeByColumn(
+                  time, list.getDoubleByValueIndex(originRowIndex, 
columnIndex), isNull);
+              break;
+            case TEXT:
+              alignedChunkWriter.writeByColumn(
+                  time, list.getBinaryByValueIndex(originRowIndex, 
columnIndex), isNull);
+              break;
+            default:
+              break;
+          }
         }
-        boolean isNull = list.isNullValue(originRowIndex, columnIndex);
-        switch (dataTypes.get(columnIndex)) {
-          case BOOLEAN:
-            alignedChunkWriter.write(
-                time, list.getBooleanByValueIndex(originRowIndex, 
columnIndex), isNull);
-            break;
-          case INT32:
-            alignedChunkWriter.write(
-                time, list.getIntByValueIndex(originRowIndex, columnIndex), 
isNull);
-            break;
-          case INT64:
-            alignedChunkWriter.write(
-                time, list.getLongByValueIndex(originRowIndex, columnIndex), 
isNull);
-            break;
-          case FLOAT:
-            alignedChunkWriter.write(
-                time, list.getFloatByValueIndex(originRowIndex, columnIndex), 
isNull);
-            break;
-          case DOUBLE:
-            alignedChunkWriter.write(
-                time, list.getDoubleByValueIndex(originRowIndex, columnIndex), 
isNull);
-            break;
-          case TEXT:
-            alignedChunkWriter.write(
-                time, list.getBinaryByValueIndex(originRowIndex, columnIndex), 
isNull);
-            break;
-          default:
-            LOGGER.error(
-                "AlignedWritableMemChunk does not support data type: {}",
-                dataTypes.get(columnIndex));
-            break;
+        alignedChunkWriter.nextColumn();
+      }
+
+      long[] times = new long[maxNumberOfPointsInPage];
+      int pointsInPage = 0;
+      for (int sortedRowIndex = pageRange.get(pageNum * 2);
+          sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
+          sortedRowIndex++) {
+        if (Objects.isNull(timeDuplicateInfo) || 
!timeDuplicateInfo[sortedRowIndex]) {
+          times[pointsInPage++] = list.getTime(sortedRowIndex);
         }
       }
-      alignedChunkWriter.write(time);
-      timeDuplicateAlignedRowIndexList = null;
+
+      alignedChunkWriter.write(times, pointsInPage, 0);
     }
   }
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 46d82d3a9f..a1fc973d60 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -219,6 +219,34 @@ public class AlignedChunkWriterImpl implements 
IChunkWriter {
     remainingPointsNumber = 
timeChunkWriter.getRemainingPointNumberForCurrentPage();
   }
 
+  public void writeByColumn(long time, int value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, long value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, boolean value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, float value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, double value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, Binary value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void nextColumn() {
+    valueIndex++;
+  }
+
   /**
    * check occupied memory size, if it exceeds the PageSize threshold, 
construct a page and put it
    * to pageBuffer

Reply via email to