This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_vector by this push:
     new bb0865a  refactor memtable write interface
     new 1347328  Merge branch 'new_vector' of https://github.com/apache/iotdb 
into new_vector
bb0865a is described below

commit bb0865adafc1ab200f107f337ed8c9b88dde51ff
Author: HTHou <[email protected]>
AuthorDate: Thu Oct 28 16:49:07 2021 +0800

    refactor memtable write interface
---
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 181 ++++++++------
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   9 +
 .../db/engine/memtable/IWritableMemChunk.java      |   5 +-
 .../db/engine/memtable/VectorWritableMemChunk.java | 267 +++++++++++++++++++++
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  35 ++-
 .../db/engine/storagegroup/TsFileProcessor.java    |  12 +-
 .../db/engine/memtable/PrimitiveMemTableTest.java  |   4 +-
 7 files changed, 414 insertions(+), 99 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index cbbf421..686e085 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -29,9 +29,10 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.rescon.TVListAllocator;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
@@ -97,13 +98,13 @@ public abstract class AbstractMemTable implements IMemTable 
{
   }
 
   /**
-   * create this memtable if it's not exist
+   * create this MemChunk if it's not exist
    *
    * @param deviceId device id
    * @param schema measurement schema
-   * @return this memtable
+   * @return this MemChunk
    */
-  private IWritableMemChunk createIfNotExistAndGet(String deviceId, 
IMeasurementSchema schema) {
+  private IWritableMemChunk createMemChunkIfNotExistAndGet(String deviceId, 
IMeasurementSchema schema) {
     Map<String, IWritableMemChunk> memSeries =
         memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
 
@@ -125,47 +126,55 @@ public abstract class AbstractMemTable implements 
IMemTable {
     Object[] values = insertRowPlan.getValues();
 
     IMeasurementMNode[] measurementMNodes = 
insertRowPlan.getMeasurementMNodes();
-    int columnIndex = 0;
-    if (insertRowPlan.isAligned()) {
-      IMeasurementMNode measurementMNode = measurementMNodes[0];
-      if (measurementMNode != null) {
-        // write vector
-        Object[] vectorValue =
-            new 
Object[measurementMNode.getSchema().getSubMeasurementsTSDataTypeList().size()];
-        for (int j = 0; j < vectorValue.length; j++) {
-          vectorValue[j] = values[columnIndex];
-          columnIndex++;
-        }
-        memSize +=
-            MemUtils.getVectorRecordSize(
-                
measurementMNode.getSchema().getSubMeasurementsTSDataTypeList(),
-                vectorValue,
-                disableMemControl);
-        write(
-            insertRowPlan.getPrefixPath().getFullPath(),
-            measurementMNode.getSchema(),
-            insertRowPlan.getTime(),
-            vectorValue);
-      }
-    } else {
-      for (IMeasurementMNode measurementMNode : measurementMNodes) {
-        if (values[columnIndex] == null) {
-          columnIndex++;
-          continue;
-        }
-        memSize +=
-            MemUtils.getRecordSize(
-                measurementMNode.getSchema().getType(), values[columnIndex], 
disableMemControl);
-
-        write(
-            insertRowPlan.getPrefixPath().getFullPath(),
-            measurementMNode.getSchema(),
-            insertRowPlan.getTime(),
-            values[columnIndex]);
-        columnIndex++;
+    for (int i = 0; i < measurementMNodes.length; i++) {
+      if (values[i] == null) {
+        continue;
       }
+      memSize +=
+          MemUtils.getRecordSize(
+              measurementMNodes[i].getSchema().getType(), values[i], 
disableMemControl);
+
+      write(
+          insertRowPlan.getPrefixPath().getFullPath(),
+          measurementMNodes[i].getSchema(),
+          insertRowPlan.getTime(),
+          values[i]);
     }
+    totalPointsNum +=
+        insertRowPlan.getMeasurements().length - 
insertRowPlan.getFailedMeasurementNumber();
+  }
 
+  @Override
+  public void insertAlignedRow(InsertRowPlan insertRowPlan) {
+    updatePlanIndexes(insertRowPlan.getIndex());
+      // write vector
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    List<TSEncoding> encodings = new ArrayList<>();
+    CompressionType compressionType = null;
+    for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
+      if (insertRowPlan.getMeasurements()[i] == null) {
+        continue;
+      }
+      IMeasurementSchema schema = 
insertRowPlan.getMeasurementMNodes()[i].getSchema();
+      measurements.add(schema.getMeasurementId());
+      types.add(schema.getType());
+      encodings.add(schema.getEncodingType());
+      compressionType = schema.getCompressor();
+    }
+    VectorMeasurementSchema vectorSchema =
+        new VectorMeasurementSchema(null, measurements.toArray(new 
String[measurements.size()]),
+            types.toArray(new TSDataType[measurements.size()]), 
encodings.toArray(new TSEncoding[measurements.size()]), compressionType);
+    memSize +=
+        MemUtils.getVectorRecordSize(
+            types,
+            insertRowPlan.getValues(),
+            disableMemControl);
+    writeAlignedRow(
+        insertRowPlan.getPrefixPath().getFullPath(),
+        vectorSchema,
+        insertRowPlan.getTime(),
+        insertRowPlan.getValues());
     totalPointsNum +=
         insertRowPlan.getMeasurements().length - 
insertRowPlan.getFailedMeasurementNumber();
   }
@@ -186,54 +195,80 @@ public abstract class AbstractMemTable implements 
IMemTable {
   }
 
   @Override
+  public void insertAlignedTablet(InsertTabletPlan insertTabletPlan, int 
start, int end)
+      throws WriteProcessException {
+    updatePlanIndexes(insertTabletPlan.getIndex());
+    try {
+      writeAlignedTablet(insertTabletPlan, start, end);
+      memSize += MemUtils.getRecordSize(insertTabletPlan, start, end, 
disableMemControl);
+      totalPointsNum +=
+          (insertTabletPlan.getDataTypes().length - 
insertTabletPlan.getFailedMeasurementNumber())
+              * (end - start);
+    } catch (RuntimeException e) {
+      throw new WriteProcessException(e);
+    }
+  }
+
+  @Override
   public void write(
       String deviceId, IMeasurementSchema schema, long insertTime, Object 
objectValue) {
-    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
+    IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(deviceId, 
schema);
+    memSeries.write(insertTime, objectValue);
+  }
+
+  @Override
+  public void writeAlignedRow(String deviceId, IMeasurementSchema schema, long 
insertTime, Object objectValue) {
+    IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(deviceId, 
schema);
     memSeries.write(insertTime, objectValue);
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
   @Override
   public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
-    int columnIndex = 0;
     updatePlanIndexes(insertTabletPlan.getIndex());
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
-      if (insertTabletPlan.getColumns()[columnIndex] == null) {
-        columnIndex++;
+      if (insertTabletPlan.getColumns()[i] == null) {
         continue;
       }
       IWritableMemChunk memSeries =
-          createIfNotExistAndGet(
+          createMemChunkIfNotExistAndGet(
               insertTabletPlan.getPrefixPath().getFullPath(),
               insertTabletPlan.getMeasurementMNodes()[i].getSchema());
-      if (insertTabletPlan.isAligned()) {
-        VectorMeasurementSchema vectorSchema =
-            (VectorMeasurementSchema) 
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
-        Object[] columns = new 
Object[vectorSchema.getSubMeasurementsList().size()];
-        BitMap[] bitMaps = new 
BitMap[vectorSchema.getSubMeasurementsList().size()];
-        for (int j = 0; j < vectorSchema.getSubMeasurementsList().size(); j++) 
{
-          columns[j] = insertTabletPlan.getColumns()[columnIndex];
-          if (insertTabletPlan.getBitMaps() != null) {
-            bitMaps[j] = insertTabletPlan.getBitMaps()[columnIndex];
-          }
-          columnIndex++;
-        }
-        memSeries.write(
-            insertTabletPlan.getTimes(), columns, bitMaps, TSDataType.VECTOR, 
start, end);
-        break;
-      } else {
-        memSeries.write(
-            insertTabletPlan.getTimes(),
-            insertTabletPlan.getColumns()[columnIndex],
-            insertTabletPlan.getBitMaps() != null
-                ? insertTabletPlan.getBitMaps()[columnIndex]
-                : null,
-            insertTabletPlan.getDataTypes()[columnIndex],
-            start,
-            end);
-        columnIndex++;
+      memSeries.write(
+          insertTabletPlan.getTimes(),
+          insertTabletPlan.getColumns()[i],
+          insertTabletPlan.getBitMaps() != null
+              ? insertTabletPlan.getBitMaps()[i]
+              : null,
+          insertTabletPlan.getDataTypes()[i],
+          start,
+          end);
+    }
+  }
+
+  public void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, 
int end) {
+    updatePlanIndexes(insertTabletPlan.getIndex());
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    List<TSEncoding> encodings = new ArrayList<>();
+    CompressionType compressionType = null;
+    for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
+      if (insertTabletPlan.getColumns()[i] == null) {
+        continue;
       }
+      IMeasurementSchema schema = 
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
+      measurements.add(schema.getMeasurementId());
+      types.add(schema.getType());
+      encodings.add(schema.getEncodingType());
+      compressionType = schema.getCompressor();
     }
+    VectorMeasurementSchema vectorSchema =
+        new VectorMeasurementSchema(null, measurements.toArray(new 
String[measurements.size()]),
+            types.toArray(new TSDataType[measurements.size()]), 
encodings.toArray(new TSEncoding[measurements.size()]), compressionType);
+    IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(
+        insertTabletPlan.getPrefixPath().getFullPath(), vectorSchema);
+    memSeries.writeVector(insertTabletPlan.getTimes(), 
insertTabletPlan.getMeasurements(),
+        insertTabletPlan.getColumns(), insertTabletPlan.getBitMaps(), start, 
end);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index e86fc96..646f67c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -46,12 +46,15 @@ public interface IMemTable {
 
   void write(String deviceId, IMeasurementSchema schema, long insertTime, 
Object objectValue);
 
+  void writeAlignedRow(String deviceId, IMeasurementSchema schema, long 
insertTime, Object objectValue);
   /**
    * write data in the range [start, end). Null value in each column values 
will be replaced by the
    * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, 
null, 5}
    */
   void write(InsertTabletPlan insertTabletPlan, int start, int end);
 
+  void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int 
end);
+
   /** @return the number of points */
   long size();
 
@@ -85,6 +88,8 @@ public interface IMemTable {
    */
   void insert(InsertRowPlan insertRowPlan);
 
+  void insertAlignedRow(InsertRowPlan insertRowPlan);
+
   /**
    * insert tablet into this memtable. The rows to be inserted are in the 
range [start, end). Null
    * value in each column values will be replaced by the subsequent non-null 
value, e.g., {1, null,
@@ -97,6 +102,9 @@ public interface IMemTable {
   void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
       throws WriteProcessException;
 
+  void insertAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int 
end)
+      throws WriteProcessException;
+
   ReadOnlyMemChunk query(
       String deviceId,
       String measurement,
@@ -154,4 +162,5 @@ public interface IMemTable {
   long getMinPlanIndex();
 
   long getCreatedTime();
+
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 4e96d88..cf276ad 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -64,7 +64,9 @@ public interface IWritableMemChunk {
    * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, 
null, 5}
    */
   void write(
-      long[] times, Object valueList, Object bitMaps, TSDataType dataType, int 
start, int end);
+      long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int 
start, int end);
+
+  void writeVector(long[] times, String[] measurements, Object[] valueList, 
BitMap[] bitMaps, int start, int end);
 
   long count();
 
@@ -100,7 +102,6 @@ public interface IWritableMemChunk {
    * served for flush requests. The logic is just same as 
getSortedTVListForQuery, but without add
    * reference count
    *
-   * @return sorted tv list
    */
   void sortTvListForFlush();
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
new file mode 100644
index 0000000..5b2e8aa
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
@@ -0,0 +1,267 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.VectorTVList;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorWritableMemChunk implements IWritableMemChunk {
+  
+  private IMeasurementSchema schema;
+  private VectorTVList list;
+  private Map<String, Integer> VectorIdIndexMap;
+  private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(VectorWritableMemChunk.class);
+
+  public VectorWritableMemChunk(IMeasurementSchema schema) {
+    this.schema = schema;
+    VectorIdIndexMap = new HashMap<>();
+    for (int i = 0; i < schema.getSubMeasurementsCount(); i++) {
+      VectorIdIndexMap.put(schema.getSubMeasurementsList().get(i), i);
+    }
+    this.list = 
TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList());
+  }
+
+  @Override
+  public void putLong(long t, long v) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putInt(long t, int v) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putFloat(long t, float v) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putDouble(long t, double v) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putBinary(long t, Binary v) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putBoolean(long t, boolean v) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putVector(long t, Object[] v) {
+    list.putVector(t, v);
+  }
+
+  @Override
+  public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int 
end) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int 
end) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int 
end) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int 
end) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void putVectors(long[] t, Object[] v, BitMap[] bitMaps, int start, 
int end) {
+    list.putVectors(t, v, bitMaps, start, end);
+  }
+
+  @Override
+  public void write(long insertTime, Object objectValue) {
+    putVector(insertTime, (Object[]) objectValue);
+  }
+
+  @Override
+  public void write(long[] times, Object valueList, BitMap bitMap, TSDataType 
dataType, int start,
+      int end) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+  }
+
+  @Override
+  public void writeVector(long[] times, String[] measurementIds, Object[] 
valueList, BitMap[] bitMaps, int start,
+      int end) {
+    checkColumnOrder(measurementIds);
+    putVectors(times, valueList, bitMaps, start, end);
+  }
+
+  private void checkColumnOrder(String[] measurementIds) {
+    // TODO HTHou
+  }
+
+  @Override
+  public long count() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public IMeasurementSchema getSchema() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public TVList getSortedTvListForQuery() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public TVList getSortedTvListForQuery(List<Integer> columnIndexList) {
+    sortTVList();
+    // increase reference count
+    list.increaseReferenceCount();
+    return list.getTvListByColumnIndex(columnIndexList);
+  }
+
+  private void sortTVList() {
+    // check reference count
+    if ((list.getReferenceCount() > 0 && !list.isSorted())) {
+      list = list.clone();
+    }
+
+    if (!list.isSorted()) {
+      list.sort();
+    }
+  }
+
+  @Override
+  public void sortTvListForFlush() {
+    sortTVList();
+  }
+
+  @Override
+  public int delete(long lowerBound, long upperBound) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public int delete(long lowerBound, long upperBound, int columnIndex) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public IChunkWriter createIChunkWriter() {
+    return new VectorChunkWriterImpl(schema);
+  }
+
+  @Override
+  public void encode(IChunkWriter chunkWriter) {
+
+    List<Integer> timeDuplicatedVectorRowIndexList = null;
+    for (int sortedRowIndex = 0; sortedRowIndex < list.size(); 
sortedRowIndex++) {
+      long time = list.getTime(sortedRowIndex);
+
+      // skip duplicated data
+      if ((sortedRowIndex + 1 < list.size()
+          && (time == list.getTime(sortedRowIndex + 1)))) {
+        // record the time duplicated row index list for vector type
+        if (timeDuplicatedVectorRowIndexList == null) {
+          timeDuplicatedVectorRowIndexList = new ArrayList<>();
+          
timeDuplicatedVectorRowIndexList.add(list.getValueIndex(sortedRowIndex));
+        }
+        timeDuplicatedVectorRowIndexList.add(list.getValueIndex(sortedRowIndex 
+ 1));
+        continue;
+      }
+      List<TSDataType> dataTypes = list.getTsDataTypes();
+      int originRowIndex = list.getValueIndex(sortedRowIndex);
+      for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) 
{
+        // write the time duplicated rows
+        if (timeDuplicatedVectorRowIndexList != null
+            && !timeDuplicatedVectorRowIndexList.isEmpty()) {
+          originRowIndex =
+              list.getValidRowIndexForTimeDuplicatedRows(
+                  timeDuplicatedVectorRowIndexList, columnIndex);
+        }
+        boolean isNull = list.isValueMarked(originRowIndex, columnIndex);
+        switch (dataTypes.get(columnIndex)) {
+          case BOOLEAN:
+            chunkWriter.write(
+                time,
+                list.getBooleanByValueIndex(originRowIndex, columnIndex),
+                isNull);
+            break;
+          case INT32:
+            chunkWriter.write(
+                time,
+                list.getIntByValueIndex(originRowIndex, columnIndex),
+                isNull);
+            break;
+          case INT64:
+            chunkWriter.write(
+                time,
+                list.getLongByValueIndex(originRowIndex, columnIndex),
+                isNull);
+            break;
+          case FLOAT:
+            chunkWriter.write(
+                time,
+                list.getFloatByValueIndex(originRowIndex, columnIndex),
+                isNull);
+            break;
+          case DOUBLE:
+            chunkWriter.write(
+                time,
+                list.getDoubleByValueIndex(originRowIndex, columnIndex),
+                isNull);
+            break;
+          case TEXT:
+            chunkWriter.write(
+                time,
+                list.getBinaryByValueIndex(originRowIndex, columnIndex),
+                isNull);
+            break;
+          default:
+            LOGGER.error(
+                "VectorWritableMemChunk does not support data type: {}",
+                dataTypes.get(columnIndex));
+            break;
+        }
+      }
+      chunkWriter.write(time);
+      timeDuplicatedVectorRowIndexList = null;
+    }
+  
+  }
+
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 59e3d58..b6da0c0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -71,41 +71,44 @@ public class WritableMemChunk implements IWritableMemChunk {
 
   @Override
   public void write(
-      long[] times, Object valueList, Object bitMap, TSDataType dataType, int 
start, int end) {
+      long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int 
start, int end) {
     switch (dataType) {
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) valueList;
-        putBooleans(times, boolValues, (BitMap) bitMap, start, end);
+        putBooleans(times, boolValues, bitMap, start, end);
         break;
       case INT32:
         int[] intValues = (int[]) valueList;
-        putInts(times, intValues, (BitMap) bitMap, start, end);
+        putInts(times, intValues, bitMap, start, end);
         break;
       case INT64:
         long[] longValues = (long[]) valueList;
-        putLongs(times, longValues, (BitMap) bitMap, start, end);
+        putLongs(times, longValues, bitMap, start, end);
         break;
       case FLOAT:
         float[] floatValues = (float[]) valueList;
-        putFloats(times, floatValues, (BitMap) bitMap, start, end);
+        putFloats(times, floatValues, bitMap, start, end);
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) valueList;
-        putDoubles(times, doubleValues, (BitMap) bitMap, start, end);
+        putDoubles(times, doubleValues, bitMap, start, end);
         break;
       case TEXT:
         Binary[] binaryValues = (Binary[]) valueList;
-        putBinaries(times, binaryValues, (BitMap) bitMap, start, end);
-        break;
-      case VECTOR:
-        Object[] vectorValues = (Object[]) valueList;
-        putVectors(times, vectorValues, (BitMap[]) bitMap, start, end);
+        putBinaries(times, binaryValues, bitMap, start, end);
         break;
       default:
         throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType);
     }
   }
 
+
+  @Override
+  public void writeVector(long[] times, String[] measurements, Object[] 
valueList, BitMap[] bitMaps,
+      int start, int end) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
list.getDataType());
+  }
+
   @Override
   public void putLong(long t, long v) {
     list.putLong(t, v);
@@ -186,13 +189,7 @@ public class WritableMemChunk implements IWritableMemChunk 
{
 
   @Override
   public synchronized TVList getSortedTvListForQuery(List<Integer> 
columnIndexList) {
-    if (list.getDataType() != TSDataType.VECTOR) {
-      throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
list.getDataType());
-    }
-    sortTVList();
-    // increase reference count
-    list.increaseReferenceCount();
-    return list.getTvListByColumnIndex(columnIndexList);
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
list.getDataType());
   }
 
   private void sortTVList() {
@@ -333,7 +330,5 @@ public class WritableMemChunk implements IWritableMemChunk {
           break;
       }
     }
-  
-    
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 4318a5b..27bc389 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -232,7 +232,11 @@ public class TsFileProcessor {
       }
     }
 
-    workMemTable.insert(insertRowPlan);
+    if (insertRowPlan.isAligned()) {
+      workMemTable.insertAlignedRow(insertRowPlan);
+    } else {
+      workMemTable.insert(insertRowPlan);
+    }
 
     // update start time of this memtable
     tsFileResource.updateStartTime(
@@ -298,7 +302,11 @@ public class TsFileProcessor {
     }
 
     try {
-      workMemTable.insertTablet(insertTabletPlan, start, end);
+      if (insertTabletPlan.isAligned()) {
+        workMemTable.insertAlignedTablet(insertTabletPlan, start, end);
+      } else {
+        workMemTable.insertTablet(insertTabletPlan, start, end);
+      }
     } catch (WriteProcessException e) {
       for (int i = start; i < end; i++) {
         results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index ee87de5..f2aa26e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -66,7 +66,7 @@ public class PrimitiveMemTableTest {
     TSDataType dataType = TSDataType.INT32;
     WritableMemChunk series =
         new WritableMemChunk(
-            new UnaryMeasurementSchema("s1", dataType, TSEncoding.PLAIN), 
TVList.newList(dataType));
+            new UnaryMeasurementSchema("s1", dataType, TSEncoding.PLAIN));
     int count = 1000;
     for (int i = 0; i < count; i++) {
       series.write(i, i);
@@ -85,7 +85,7 @@ public class PrimitiveMemTableTest {
     TSDataType dataType = TSDataType.INT32;
     WritableMemChunk series =
         new WritableMemChunk(
-            new UnaryMeasurementSchema("s1", dataType, TSEncoding.PLAIN), 
TVList.newList(dataType));
+            new UnaryMeasurementSchema("s1", dataType, TSEncoding.PLAIN));
     int count = 100;
     for (int i = 0; i < count; i++) {
       series.write(i, i);

Reply via email to