This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_vector by this push:
     new b98ad1f  implement VectorWritableMemChunk
b98ad1f is described below

commit b98ad1f3048362080e10e51b60fd6166f0f6e38d
Author: HTHou <[email protected]>
AuthorDate: Thu Oct 28 12:27:53 2021 +0800

    implement VectorWritableMemChunk
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 114 +--------------------
 .../db/engine/memtable/IWritableMemChunk.java      |   2 +-
 .../db/engine/memtable/PrimitiveMemTable.java      |   7 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  67 ++++++++++--
 .../apache/iotdb/db/rescon/TVListAllocator.java    |   3 +-
 .../iotdb/db/utils/datastructure/TVList.java       |   2 +-
 6 files changed, 64 insertions(+), 131 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 3c29135..30e878a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -162,118 +162,6 @@ public class MemTableFlushTask {
   /** encoding task (second task of pipeline) */
   private Runnable encodingTask =
       new Runnable() {
-        private void writeOneSeries(
-            TVList tvPairs, IChunkWriter seriesWriterImpl, TSDataType 
dataType) {
-          List<Integer> timeDuplicatedVectorRowIndexList = null;
-          for (int sortedRowIndex = 0; sortedRowIndex < tvPairs.size(); 
sortedRowIndex++) {
-            long time = tvPairs.getTime(sortedRowIndex);
-
-            // skip duplicated data
-            if ((sortedRowIndex + 1 < tvPairs.size()
-                && (time == tvPairs.getTime(sortedRowIndex + 1)))) {
-              // record the time duplicated row index list for vector type
-              if (dataType == TSDataType.VECTOR) {
-                if (timeDuplicatedVectorRowIndexList == null) {
-                  timeDuplicatedVectorRowIndexList = new ArrayList<>();
-                  
timeDuplicatedVectorRowIndexList.add(tvPairs.getValueIndex(sortedRowIndex));
-                }
-                
timeDuplicatedVectorRowIndexList.add(tvPairs.getValueIndex(sortedRowIndex + 1));
-              }
-              continue;
-            }
-
-            // store last point for SDT
-            if (dataType != TSDataType.VECTOR && sortedRowIndex + 1 == 
tvPairs.size()) {
-              ((ChunkWriterImpl) seriesWriterImpl).setLastPoint(true);
-            }
-
-            switch (dataType) {
-              case BOOLEAN:
-                seriesWriterImpl.write(time, 
tvPairs.getBoolean(sortedRowIndex), false);
-                break;
-              case INT32:
-                seriesWriterImpl.write(time, tvPairs.getInt(sortedRowIndex), 
false);
-                break;
-              case INT64:
-                seriesWriterImpl.write(time, tvPairs.getLong(sortedRowIndex), 
false);
-                break;
-              case FLOAT:
-                seriesWriterImpl.write(time, tvPairs.getFloat(sortedRowIndex), 
false);
-                break;
-              case DOUBLE:
-                seriesWriterImpl.write(time, 
tvPairs.getDouble(sortedRowIndex), false);
-                break;
-              case TEXT:
-                seriesWriterImpl.write(time, 
tvPairs.getBinary(sortedRowIndex), false);
-                break;
-              case VECTOR:
-                VectorTVList vectorTvPairs = (VectorTVList) tvPairs;
-                List<TSDataType> dataTypes = vectorTvPairs.getTsDataTypes();
-                int originRowIndex = 
vectorTvPairs.getValueIndex(sortedRowIndex);
-                for (int columnIndex = 0; columnIndex < dataTypes.size(); 
columnIndex++) {
-                  // write the time duplicated rows
-                  if (timeDuplicatedVectorRowIndexList != null
-                      && !timeDuplicatedVectorRowIndexList.isEmpty()) {
-                    originRowIndex =
-                        vectorTvPairs.getValidRowIndexForTimeDuplicatedRows(
-                            timeDuplicatedVectorRowIndexList, columnIndex);
-                  }
-                  boolean isNull = vectorTvPairs.isValueMarked(originRowIndex, 
columnIndex);
-                  switch (dataTypes.get(columnIndex)) {
-                    case BOOLEAN:
-                      seriesWriterImpl.write(
-                          time,
-                          vectorTvPairs.getBooleanByValueIndex(originRowIndex, 
columnIndex),
-                          isNull);
-                      break;
-                    case INT32:
-                      seriesWriterImpl.write(
-                          time,
-                          vectorTvPairs.getIntByValueIndex(originRowIndex, 
columnIndex),
-                          isNull);
-                      break;
-                    case INT64:
-                      seriesWriterImpl.write(
-                          time,
-                          vectorTvPairs.getLongByValueIndex(originRowIndex, 
columnIndex),
-                          isNull);
-                      break;
-                    case FLOAT:
-                      seriesWriterImpl.write(
-                          time,
-                          vectorTvPairs.getFloatByValueIndex(originRowIndex, 
columnIndex),
-                          isNull);
-                      break;
-                    case DOUBLE:
-                      seriesWriterImpl.write(
-                          time,
-                          vectorTvPairs.getDoubleByValueIndex(originRowIndex, 
columnIndex),
-                          isNull);
-                      break;
-                    case TEXT:
-                      seriesWriterImpl.write(
-                          time,
-                          vectorTvPairs.getBinaryByValueIndex(originRowIndex, 
columnIndex),
-                          isNull);
-                      break;
-                    default:
-                      LOGGER.error(
-                          "Storage group {} does not support data type: {}",
-                          storageGroup,
-                          dataTypes.get(columnIndex));
-                      break;
-                  }
-                }
-                seriesWriterImpl.write(time);
-                timeDuplicatedVectorRowIndexList = null;
-                break;
-              default:
-                LOGGER.error(
-                    "Storage group {} does not support data type: {}", 
storageGroup, dataType);
-                break;
-            }
-          }
-        }
 
         @SuppressWarnings("squid:S135")
         @Override
@@ -311,7 +199,7 @@ public class MemTableFlushTask {
             } else {
               long starTime = System.currentTimeMillis();
               IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
-              IChunkWriter seriesWriter = writableMemChunk.createIChunkWrite();
+              IChunkWriter seriesWriter = 
writableMemChunk.createIChunkWriter();
               writableMemChunk.encode(seriesWriter);
               seriesWriter.sealCurrentPage();
               seriesWriter.clearPageWriter();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index d6a3862..4e96d88 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -118,7 +118,7 @@ public interface IWritableMemChunk {
   // For delete one column in the vector
   int delete(long lowerBound, long upperBound, int columnIndex);
 
-  IChunkWriter createIChunkWrite();
+  IChunkWriter createIChunkWriter();
 
   void encode(IChunkWriter chunkWriter);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 922cb12..2b85280 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.engine.memtable;
 
-import org.apache.iotdb.db.rescon.TVListAllocator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -41,11 +40,9 @@ public class PrimitiveMemTable extends AbstractMemTable {
   @Override
   protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
     if (schema.getType() == TSDataType.VECTOR) {
-      return new WritableMemChunk(
-          schema,
-          
TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList()));
+      return new VectorWritableMemChunk(schema);
     }
-    return new WritableMemChunk(schema, 
TVListAllocator.getInstance().allocate(schema.getType()));
+    return new WritableMemChunk(schema);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index f7d359e..59e3d58 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -18,14 +18,17 @@
  */
 package org.apache.iotdb.db.engine.memtable;
 
+import org.apache.iotdb.db.rescon.TVListAllocator;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.List;
 
 public class WritableMemChunk implements IWritableMemChunk {
@@ -33,10 +36,11 @@ public class WritableMemChunk implements IWritableMemChunk {
   private IMeasurementSchema schema;
   private TVList list;
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WritableMemChunk.class);
 
-  public WritableMemChunk(IMeasurementSchema schema, TVList list) {
+  public WritableMemChunk(IMeasurementSchema schema) {
     this.schema = schema;
-    this.list = list;
+    this.list = TVListAllocator.getInstance().allocate(schema.getType());
   }
 
   @Override
@@ -60,9 +64,6 @@ public class WritableMemChunk implements IWritableMemChunk {
       case TEXT:
         putBinary(insertTime, (Binary) objectValue);
         break;
-      case VECTOR:
-        putVector(insertTime, (Object[]) objectValue);
-        break;
       default:
         throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
     }
@@ -137,7 +138,7 @@ public class WritableMemChunk implements IWritableMemChunk {
 
   @Override
   public void putVector(long t, Object[] v) {
-    list.putVector(t, v);
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
   }
 
   @Override
@@ -172,7 +173,7 @@ public class WritableMemChunk implements IWritableMemChunk {
 
   @Override
   public void putVectors(long[] t, Object[] v, BitMap[] bitMaps, int start, 
int end) {
-    list.putVectors(t, v, bitMaps, start, end);
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
   }
 
   @Override
@@ -254,8 +255,8 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public IChunkWriter createIChunkWrite() {
-    return null;
+  public IChunkWriter createIChunkWriter() {
+    return new ChunkWriterImpl(schema);
   }
 
   @Override
@@ -289,4 +290,50 @@ public class WritableMemChunk implements IWritableMemChunk 
{
     }
     return out.toString();
   }
+
+  @Override
+  public void encode(IChunkWriter chunkWriter) {
+
+    for (int sortedRowIndex = 0; sortedRowIndex < list.size(); 
sortedRowIndex++) {
+      long time = list.getTime(sortedRowIndex);
+
+      // skip duplicated data
+      if ((sortedRowIndex + 1 < list.size()
+          && (time == list.getTime(sortedRowIndex + 1)))) {
+        continue;
+      }
+
+      // store last point for SDT
+      if (sortedRowIndex + 1 == list.size()) {
+        ((ChunkWriterImpl) chunkWriter).setLastPoint(true);
+      }
+
+      switch (schema.getType()) {
+        case BOOLEAN:
+          chunkWriter.write(time, list.getBoolean(sortedRowIndex), false);
+          break;
+        case INT32:
+          chunkWriter.write(time, list.getInt(sortedRowIndex), false);
+          break;
+        case INT64:
+          chunkWriter.write(time, list.getLong(sortedRowIndex), false);
+          break;
+        case FLOAT:
+          chunkWriter.write(time, list.getFloat(sortedRowIndex), false);
+          break;
+        case DOUBLE:
+          chunkWriter.write(time, list.getDouble(sortedRowIndex), false);
+          break;
+        case TEXT:
+          chunkWriter.write(time, list.getBinary(sortedRowIndex), false);
+          break;
+        default:
+          LOGGER.error(
+              "WritableMemChunk does not support data type: {}", 
schema.getType());
+          break;
+      }
+    }
+  
+    
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
index f2e54ab..fab5fab 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.VectorTVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayDeque;
@@ -56,7 +57,7 @@ public class TVListAllocator implements TVListAllocatorMBean, 
IService {
     return list != null ? list : TVList.newList(dataType);
   }
 
-  public synchronized TVList allocate(List<TSDataType> dataTypes) {
+  public synchronized VectorTVList allocate(List<TSDataType> dataTypes) {
     return TVList.newVectorList(dataTypes);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 3e78339..796b6e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -82,7 +82,7 @@ public abstract class TVList {
     return null;
   }
 
-  public static TVList newVectorList(List<TSDataType> datatypes) {
+  public static VectorTVList newVectorList(List<TSDataType> datatypes) {
     return new VectorTVList(datatypes);
   }
 

Reply via email to