This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/vectorMemTable by this push:
new 36ef81d implament flush interface
36ef81d is described below
commit 36ef81d3930f3ce468264d40759035d4560cff2a
Author: HTHou <[email protected]>
AuthorDate: Fri Mar 12 20:59:27 2021 +0800
implament flush interface
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 61 +++++++++++++++++++---
.../iotdb/db/utils/datastructure/VectorTVList.java | 14 ++++-
2 files changed, 68 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 6074f38..ff55f91 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.VectorTVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -36,7 +38,10 @@ import
org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
+
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -158,6 +163,12 @@ public class MemTableFlushTask {
new Runnable() {
private void writeOneSeries(
TVList tvPairs, IChunkWriter seriesWriterImpl, TSDataType
dataType) {
+
+ if (dataType == TSDataType.VECTOR) {
+ writeOneVectorSeries(tvPairs, seriesWriterImpl);
+ return;
+ }
+
for (int i = 0; i < tvPairs.size(); i++) {
long time = tvPairs.getTime(i);
@@ -190,12 +201,6 @@ public class MemTableFlushTask {
case TEXT:
seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
break;
- case VECTOR:
- // TODO:
- // for ( : tvPairs.getVector(i)) {
- // seriesWriterImpl.write(time,
tvPairs.getVector(i)[], get);
- // }
- break;
default:
LOGGER.error(
"Storage group {} does not support data type: {}",
storageGroup, dataType);
@@ -204,6 +209,50 @@ public class MemTableFlushTask {
}
}
+ private void writeOneVectorSeries(TVList tvPairs, IChunkWriter
seriesWriterImpl) {
+ VectorTVList tvList = (VectorTVList) tvPairs;
+ List<TSDataType> dataTypes = tvList.getTsDataTypes();
+ List<List<Object>> values = tvList.getValues();
+ for (int i = 0; i < dataTypes.size(); i++) {
+ List<Object> columnValues = values.get(i);
+ for (int j = 0; j < tvList.size(); j++) {
+ long time = tvList.getTime(j);
+ // skip duplicated data
+ if ((i + 1 < tvList.size() && (time == tvPairs.getTime(i + 1))))
{
+ continue;
+ }
+ int valueIndex = tvList.getValueIndex(j);
+ if (valueIndex >= tvList.size()) {
+ throw new ArrayIndexOutOfBoundsException(valueIndex);
+ }
+ int arrayIndex = valueIndex / ARRAY_SIZE;
+ int elementIndex = valueIndex % ARRAY_SIZE;
+ switch (dataTypes.get(i)) {
+ case TEXT:
+ seriesWriterImpl.write(time, ((Binary[])
columnValues.get(arrayIndex))[elementIndex], false);
+ break;
+ case FLOAT:
+ seriesWriterImpl.write(time, ((float[])
columnValues.get(arrayIndex))[elementIndex], false);
+ break;
+ case INT32:
+ seriesWriterImpl.write(time, ((int[])
columnValues.get(arrayIndex))[elementIndex], false);
+ break;
+ case INT64:
+ seriesWriterImpl.write(time, ((long[])
columnValues.get(arrayIndex))[elementIndex], false);
+ break;
+ case DOUBLE:
+ seriesWriterImpl.write(time, ((double[])
columnValues.get(arrayIndex))[elementIndex], false);
+ break;
+ case BOOLEAN:
+ seriesWriterImpl.write(time, ((boolean[])
columnValues.get(arrayIndex))[elementIndex], false);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+
@SuppressWarnings("squid:S135")
@Override
public void run() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
index 441e5fe..d54da4f 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -150,6 +150,18 @@ public class VectorTVList extends TVList {
return TsPrimitiveType.getByType(TSDataType.VECTOR, vector);
}
+ public List<List<Object>> getValues() {
+ return values;
+ }
+
+ public List<TSDataType> getTsDataTypes() {
+ return dataTypes;
+ }
+
+ public List<int[]> getIndices() {
+ return indices;
+ }
+
protected void set(int index, long timestamp, int valueIndex) {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index);
@@ -299,7 +311,7 @@ public class VectorTVList extends TVList {
pivotIndex = getValueIndex(pos);
}
- private int getValueIndex(int index) {
+ public int getValueIndex(int index) {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index);
}