This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/vectorMemTable by this push:
     new 36ef81d  implament flush interface
36ef81d is described below

commit 36ef81d3930f3ce468264d40759035d4560cff2a
Author: HTHou <[email protected]>
AuthorDate: Fri Mar 12 20:59:27 2021 +0800

    implament flush interface
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 61 +++++++++++++++++++---
 .../iotdb/db/utils/datastructure/VectorTVList.java | 14 ++++-
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 6074f38..ff55f91 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.VectorTVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -36,7 +38,10 @@ import 
org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
+
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -158,6 +163,12 @@ public class MemTableFlushTask {
       new Runnable() {
         private void writeOneSeries(
             TVList tvPairs, IChunkWriter seriesWriterImpl, TSDataType 
dataType) {
+
+          if (dataType == TSDataType.VECTOR) {
+            writeOneVectorSeries(tvPairs, seriesWriterImpl);
+            return;
+          }
+
           for (int i = 0; i < tvPairs.size(); i++) {
             long time = tvPairs.getTime(i);
 
@@ -190,12 +201,6 @@ public class MemTableFlushTask {
               case TEXT:
                 seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
                 break;
-              case VECTOR:
-                // TODO:
-                //                for ( : tvPairs.getVector(i)) {
-                //                  seriesWriterImpl.write(time, 
tvPairs.getVector(i)[], get);
-                //                }
-                break;
               default:
                 LOGGER.error(
                     "Storage group {} does not support data type: {}", 
storageGroup, dataType);
@@ -204,6 +209,50 @@ public class MemTableFlushTask {
           }
         }
 
+        private void writeOneVectorSeries(TVList tvPairs, IChunkWriter 
seriesWriterImpl) {
+          VectorTVList tvList = (VectorTVList) tvPairs;
+          List<TSDataType> dataTypes = tvList.getTsDataTypes();
+          List<List<Object>> values = tvList.getValues();
+          for (int i = 0; i < dataTypes.size(); i++) {
+            List<Object> columnValues = values.get(i);
+            for (int j = 0; j < tvList.size(); j++) {
+              long time = tvList.getTime(j);
+              // skip duplicated data
+              if ((i + 1 < tvList.size() && (time == tvPairs.getTime(i + 1)))) 
{
+                continue;
+              }
+              int valueIndex = tvList.getValueIndex(j);
+              if (valueIndex >= tvList.size()) {
+                throw new ArrayIndexOutOfBoundsException(valueIndex);
+              }
+              int arrayIndex = valueIndex / ARRAY_SIZE;
+              int elementIndex = valueIndex % ARRAY_SIZE;
+              switch (dataTypes.get(i)) {
+                case TEXT:
+                  seriesWriterImpl.write(time, ((Binary[]) 
columnValues.get(arrayIndex))[elementIndex], false);
+                  break;
+                case FLOAT:
+                  seriesWriterImpl.write(time, ((float[]) 
columnValues.get(arrayIndex))[elementIndex], false);
+                  break;
+                case INT32:
+                  seriesWriterImpl.write(time, ((int[]) 
columnValues.get(arrayIndex))[elementIndex], false);
+                  break;
+                case INT64:
+                  seriesWriterImpl.write(time, ((long[]) 
columnValues.get(arrayIndex))[elementIndex], false);
+                  break;
+                case DOUBLE:
+                  seriesWriterImpl.write(time, ((double[]) 
columnValues.get(arrayIndex))[elementIndex], false);
+                  break;
+                case BOOLEAN:
+                  seriesWriterImpl.write(time, ((boolean[]) 
columnValues.get(arrayIndex))[elementIndex], false);
+                  break;
+                default:
+                  break;
+              }
+            }
+          }
+        }
+
         @SuppressWarnings("squid:S135")
         @Override
         public void run() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
index 441e5fe..d54da4f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -150,6 +150,18 @@ public class VectorTVList extends TVList {
     return TsPrimitiveType.getByType(TSDataType.VECTOR, vector);
   }
 
+  public List<List<Object>> getValues() {
+    return values;
+  }
+
+  public List<TSDataType> getTsDataTypes() {
+    return dataTypes;
+  }
+
+  public List<int[]> getIndices() {
+    return indices;
+  }
+
   protected void set(int index, long timestamp, int valueIndex) {
     if (index >= size) {
       throw new ArrayIndexOutOfBoundsException(index);
@@ -299,7 +311,7 @@ public class VectorTVList extends TVList {
     pivotIndex = getValueIndex(pos);
   }
 
-  private int getValueIndex(int index) {
+  public int getValueIndex(int index) {
     if (index >= size) {
       throw new ArrayIndexOutOfBoundsException(index);
     }

Reply via email to