This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch aligned_flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4207e33a03d0fc146aa044bc12f4c016e1a53c68
Author: HTHou <[email protected]>
AuthorDate: Tue Aug 9 17:04:15 2022 +0800

    impor
---
 .../engine/memtable/AlignedWritableMemChunk.java   | 18 ++---
 .../db/utils/datastructure/AlignedTVList.java      | 84 ++++++++++++++++++++++
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java | 28 ++++++++
 3 files changed, 121 insertions(+), 9 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index 5796c87845..b1632c0c7e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -297,13 +295,15 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   @Override
   public void encode(IChunkWriter chunkWriter) {
     AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
-    List<TSEncoding> encodingList = new ArrayList<>();
-    for (TSDataType e : list.getTsDataTypes()) {
-      encodingList.add(TSEncoding.PLAIN);
-    }
-    TsBlock tsBlock = list.buildTsBlock(0, encodingList, null);
-    alignedChunkWriter.write(
-        tsBlock.getTimeColumn(), tsBlock.getValueColumns(), 
tsBlock.getPositionCount());
+    //    List<TSEncoding> encodingList = new ArrayList<>();
+    //    for (TSDataType e : list.getTsDataTypes()) {
+    //      encodingList.add(TSEncoding.PLAIN);
+    //    }
+    //    TsBlock tsBlock = list.buildTsBlock(0, encodingList, null);
+    //    alignedChunkWriter.write(
+    //        tsBlock.getTimeColumn(), tsBlock.getValueColumns(), 
tsBlock.getPositionCount());
+    list.writeAlignedChunk(alignedChunkWriter);
+
     //    List<Integer> timeDuplicateAlignedRowIndexList = null;
     //    for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); 
sortedRowIndex++) {
     //      long time = list.getTime(sortedRowIndex);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 27c1bf8510..0c0cd78bc7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -987,6 +988,89 @@ public class AlignedTVList extends TVList {
     return builder.build();
   }
 
+  public void writeAlignedChunk(AlignedChunkWriterImpl chunkWriter) {
+    // Time column
+    boolean[] timeDuplicateInfo = null;
+    // time column
+    for (int sortedRowIndex = 0; sortedRowIndex < rowCount; sortedRowIndex++) {
+      long time = getTime(sortedRowIndex);
+      if (sortedRowIndex == rowCount - 1 || time != getTime(sortedRowIndex + 
1)) {
+        chunkWriter.write(time);
+      } else {
+        if (Objects.isNull(timeDuplicateInfo)) {
+          timeDuplicateInfo = new boolean[rowCount];
+        }
+        timeDuplicateInfo[sortedRowIndex] = true;
+      }
+    }
+
+    // value columns
+    for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
+      // Pair of Time and Index
+      Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
+      if (Objects.nonNull(timeDuplicateInfo)) {
+        lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, null);
+      }
+      for (int sortedRowIndex = 0; sortedRowIndex < rowCount; 
sortedRowIndex++) {
+        // skip time duplicated rows
+        if (Objects.nonNull(timeDuplicateInfo)) {
+          if (!isNullValue(getValueIndex(sortedRowIndex), columnIndex)) {
+            lastValidPointIndexForTimeDupCheck.left = getTime(sortedRowIndex);
+            lastValidPointIndexForTimeDupCheck.right = 
getValueIndex(sortedRowIndex);
+          }
+          if (timeDuplicateInfo[sortedRowIndex]) {
+            continue;
+          }
+        }
+        // The part of code solves the following problem:
+        // Time: 1,2,2,3
+        // Value: 1,2,null,null
+        // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, 
write(T:1,V:1)
+        // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing 
value
+        // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, 
T:2!=air.left:2, write(T:2,V:2)
+        // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, 
T:3!=pair.left:2, write(T:3,V:null)
+        int originRowIndex;
+        long time = getTime(sortedRowIndex);
+        if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
+            && (time == lastValidPointIndexForTimeDupCheck.left)) {
+          originRowIndex = lastValidPointIndexForTimeDupCheck.right;
+        } else {
+          originRowIndex = getValueIndex(sortedRowIndex);
+        }
+        boolean isNull = isNullValue(originRowIndex, columnIndex);
+        switch (dataTypes.get(columnIndex)) {
+          case BOOLEAN:
+            chunkWriter.writeByColumn(
+                time, getBooleanByValueIndex(originRowIndex, columnIndex), 
isNull);
+            break;
+          case INT32:
+            chunkWriter.writeByColumn(
+                time, getIntByValueIndex(originRowIndex, columnIndex), isNull);
+            break;
+          case INT64:
+            chunkWriter.writeByColumn(
+                time, getLongByValueIndex(originRowIndex, columnIndex), 
isNull);
+            break;
+          case FLOAT:
+            chunkWriter.writeByColumn(
+                time, getFloatByValueIndex(originRowIndex, columnIndex), 
isNull);
+            break;
+          case DOUBLE:
+            chunkWriter.writeByColumn(
+                time, getDoubleByValueIndex(originRowIndex, columnIndex), 
isNull);
+            break;
+          case TEXT:
+            chunkWriter.writeByColumn(
+                time, getBinaryByValueIndex(originRowIndex, columnIndex), 
isNull);
+            break;
+          default:
+            break;
+        }
+      }
+      chunkWriter.nextColumn();
+    }
+  }
+
   protected void writeValidValuesIntoTsBlock(
       TsBlockBuilder builder,
       int floatPrecision,
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 46d82d3a9f..60df2cc5a6 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -164,6 +164,34 @@ public class AlignedChunkWriterImpl implements 
IChunkWriter {
     }
   }
 
+  public void writeByColumn(long time, int value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, long value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, boolean value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, float value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, double value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void writeByColumn(long time, Binary value, boolean isNull) {
+    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+  }
+
+  public void nextColumn() {
+    valueIndex++;
+  }
+
   public void write(TimeColumn timeColumn, Column[] valueColumns, int 
batchSize) {
     if (remainingPointsNumber < batchSize) {
       int pointsHasWritten = (int) remainingPointsNumber;

Reply via email to