This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_vector by this push:
new bb0865a refactor memtable write interface
new 1347328 Merge branch 'new_vector' of https://github.com/apache/iotdb
into new_vector
bb0865a is described below
commit bb0865adafc1ab200f107f337ed8c9b88dde51ff
Author: HTHou <[email protected]>
AuthorDate: Thu Oct 28 16:49:07 2021 +0800
refactor memtable write interface
---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 181 ++++++++------
.../apache/iotdb/db/engine/memtable/IMemTable.java | 9 +
.../db/engine/memtable/IWritableMemChunk.java | 5 +-
.../db/engine/memtable/VectorWritableMemChunk.java | 267 +++++++++++++++++++++
.../iotdb/db/engine/memtable/WritableMemChunk.java | 35 ++-
.../db/engine/storagegroup/TsFileProcessor.java | 12 +-
.../db/engine/memtable/PrimitiveMemTableTest.java | 4 +-
7 files changed, 414 insertions(+), 99 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index cbbf421..686e085 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -29,9 +29,10 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
@@ -97,13 +98,13 @@ public abstract class AbstractMemTable implements IMemTable
{
}
/**
- * create this memtable if it's not exist
+ * create this MemChunk if it's not exist
*
* @param deviceId device id
* @param schema measurement schema
- * @return this memtable
+ * @return this MemChunk
*/
- private IWritableMemChunk createIfNotExistAndGet(String deviceId,
IMeasurementSchema schema) {
+ private IWritableMemChunk createMemChunkIfNotExistAndGet(String deviceId,
IMeasurementSchema schema) {
Map<String, IWritableMemChunk> memSeries =
memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
@@ -125,47 +126,55 @@ public abstract class AbstractMemTable implements
IMemTable {
Object[] values = insertRowPlan.getValues();
IMeasurementMNode[] measurementMNodes =
insertRowPlan.getMeasurementMNodes();
- int columnIndex = 0;
- if (insertRowPlan.isAligned()) {
- IMeasurementMNode measurementMNode = measurementMNodes[0];
- if (measurementMNode != null) {
- // write vector
- Object[] vectorValue =
- new
Object[measurementMNode.getSchema().getSubMeasurementsTSDataTypeList().size()];
- for (int j = 0; j < vectorValue.length; j++) {
- vectorValue[j] = values[columnIndex];
- columnIndex++;
- }
- memSize +=
- MemUtils.getVectorRecordSize(
-
measurementMNode.getSchema().getSubMeasurementsTSDataTypeList(),
- vectorValue,
- disableMemControl);
- write(
- insertRowPlan.getPrefixPath().getFullPath(),
- measurementMNode.getSchema(),
- insertRowPlan.getTime(),
- vectorValue);
- }
- } else {
- for (IMeasurementMNode measurementMNode : measurementMNodes) {
- if (values[columnIndex] == null) {
- columnIndex++;
- continue;
- }
- memSize +=
- MemUtils.getRecordSize(
- measurementMNode.getSchema().getType(), values[columnIndex],
disableMemControl);
-
- write(
- insertRowPlan.getPrefixPath().getFullPath(),
- measurementMNode.getSchema(),
- insertRowPlan.getTime(),
- values[columnIndex]);
- columnIndex++;
+ for (int i = 0; i < measurementMNodes.length; i++) {
+ if (values[i] == null) {
+ continue;
}
+ memSize +=
+ MemUtils.getRecordSize(
+ measurementMNodes[i].getSchema().getType(), values[i],
disableMemControl);
+
+ write(
+ insertRowPlan.getPrefixPath().getFullPath(),
+ measurementMNodes[i].getSchema(),
+ insertRowPlan.getTime(),
+ values[i]);
}
+ totalPointsNum +=
+ insertRowPlan.getMeasurements().length -
insertRowPlan.getFailedMeasurementNumber();
+ }
+ @Override
+ public void insertAlignedRow(InsertRowPlan insertRowPlan) {
+ updatePlanIndexes(insertRowPlan.getIndex());
+ // write vector
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ CompressionType compressionType = null;
+ for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
+ if (insertRowPlan.getMeasurements()[i] == null) {
+ continue;
+ }
+ IMeasurementSchema schema =
insertRowPlan.getMeasurementMNodes()[i].getSchema();
+ measurements.add(schema.getMeasurementId());
+ types.add(schema.getType());
+ encodings.add(schema.getEncodingType());
+ compressionType = schema.getCompressor();
+ }
+ VectorMeasurementSchema vectorSchema =
+ new VectorMeasurementSchema(null, measurements.toArray(new
String[measurements.size()]),
+ types.toArray(new TSDataType[measurements.size()]),
encodings.toArray(new TSEncoding[measurements.size()]), compressionType);
+ memSize +=
+ MemUtils.getVectorRecordSize(
+ types,
+ insertRowPlan.getValues(),
+ disableMemControl);
+ writeAlignedRow(
+ insertRowPlan.getPrefixPath().getFullPath(),
+ vectorSchema,
+ insertRowPlan.getTime(),
+ insertRowPlan.getValues());
totalPointsNum +=
insertRowPlan.getMeasurements().length -
insertRowPlan.getFailedMeasurementNumber();
}
@@ -186,54 +195,80 @@ public abstract class AbstractMemTable implements
IMemTable {
}
@Override
+ public void insertAlignedTablet(InsertTabletPlan insertTabletPlan, int
start, int end)
+ throws WriteProcessException {
+ updatePlanIndexes(insertTabletPlan.getIndex());
+ try {
+ writeAlignedTablet(insertTabletPlan, start, end);
+ memSize += MemUtils.getRecordSize(insertTabletPlan, start, end,
disableMemControl);
+ totalPointsNum +=
+ (insertTabletPlan.getDataTypes().length -
insertTabletPlan.getFailedMeasurementNumber())
+ * (end - start);
+ } catch (RuntimeException e) {
+ throw new WriteProcessException(e);
+ }
+ }
+
+ @Override
public void write(
String deviceId, IMeasurementSchema schema, long insertTime, Object
objectValue) {
- IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
+ IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(deviceId,
schema);
+ memSeries.write(insertTime, objectValue);
+ }
+
+ @Override
+ public void writeAlignedRow(String deviceId, IMeasurementSchema schema, long
insertTime, Object objectValue) {
+ IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(deviceId,
schema);
memSeries.write(insertTime, objectValue);
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@Override
public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
- int columnIndex = 0;
updatePlanIndexes(insertTabletPlan.getIndex());
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
- if (insertTabletPlan.getColumns()[columnIndex] == null) {
- columnIndex++;
+ if (insertTabletPlan.getColumns()[i] == null) {
continue;
}
IWritableMemChunk memSeries =
- createIfNotExistAndGet(
+ createMemChunkIfNotExistAndGet(
insertTabletPlan.getPrefixPath().getFullPath(),
insertTabletPlan.getMeasurementMNodes()[i].getSchema());
- if (insertTabletPlan.isAligned()) {
- VectorMeasurementSchema vectorSchema =
- (VectorMeasurementSchema)
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
- Object[] columns = new
Object[vectorSchema.getSubMeasurementsList().size()];
- BitMap[] bitMaps = new
BitMap[vectorSchema.getSubMeasurementsList().size()];
- for (int j = 0; j < vectorSchema.getSubMeasurementsList().size(); j++)
{
- columns[j] = insertTabletPlan.getColumns()[columnIndex];
- if (insertTabletPlan.getBitMaps() != null) {
- bitMaps[j] = insertTabletPlan.getBitMaps()[columnIndex];
- }
- columnIndex++;
- }
- memSeries.write(
- insertTabletPlan.getTimes(), columns, bitMaps, TSDataType.VECTOR,
start, end);
- break;
- } else {
- memSeries.write(
- insertTabletPlan.getTimes(),
- insertTabletPlan.getColumns()[columnIndex],
- insertTabletPlan.getBitMaps() != null
- ? insertTabletPlan.getBitMaps()[columnIndex]
- : null,
- insertTabletPlan.getDataTypes()[columnIndex],
- start,
- end);
- columnIndex++;
+ memSeries.write(
+ insertTabletPlan.getTimes(),
+ insertTabletPlan.getColumns()[i],
+ insertTabletPlan.getBitMaps() != null
+ ? insertTabletPlan.getBitMaps()[i]
+ : null,
+ insertTabletPlan.getDataTypes()[i],
+ start,
+ end);
+ }
+ }
+
+ public void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start,
int end) {
+ updatePlanIndexes(insertTabletPlan.getIndex());
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ CompressionType compressionType = null;
+ for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
+ if (insertTabletPlan.getColumns()[i] == null) {
+ continue;
}
+ IMeasurementSchema schema =
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
+ measurements.add(schema.getMeasurementId());
+ types.add(schema.getType());
+ encodings.add(schema.getEncodingType());
+ compressionType = schema.getCompressor();
}
+ VectorMeasurementSchema vectorSchema =
+ new VectorMeasurementSchema(null, measurements.toArray(new
String[measurements.size()]),
+ types.toArray(new TSDataType[measurements.size()]),
encodings.toArray(new TSEncoding[measurements.size()]), compressionType);
+ IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(
+ insertTabletPlan.getPrefixPath().getFullPath(), vectorSchema);
+ memSeries.writeVector(insertTabletPlan.getTimes(),
insertTabletPlan.getMeasurements(),
+ insertTabletPlan.getColumns(), insertTabletPlan.getBitMaps(), start,
end);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index e86fc96..646f67c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -46,12 +46,15 @@ public interface IMemTable {
void write(String deviceId, IMeasurementSchema schema, long insertTime,
Object objectValue);
+ void writeAlignedRow(String deviceId, IMeasurementSchema schema, long
insertTime, Object objectValue);
/**
* write data in the range [start, end). Null value in each column values
will be replaced by the
* subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5,
null, 5}
*/
void write(InsertTabletPlan insertTabletPlan, int start, int end);
+ void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int
end);
+
/** @return the number of points */
long size();
@@ -85,6 +88,8 @@ public interface IMemTable {
*/
void insert(InsertRowPlan insertRowPlan);
+ void insertAlignedRow(InsertRowPlan insertRowPlan);
+
/**
* insert tablet into this memtable. The rows to be inserted are in the
range [start, end). Null
* value in each column values will be replaced by the subsequent non-null
value, e.g., {1, null,
@@ -97,6 +102,9 @@ public interface IMemTable {
void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException;
+ void insertAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int
end)
+ throws WriteProcessException;
+
ReadOnlyMemChunk query(
String deviceId,
String measurement,
@@ -154,4 +162,5 @@ public interface IMemTable {
long getMinPlanIndex();
long getCreatedTime();
+
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 4e96d88..cf276ad 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -64,7 +64,9 @@ public interface IWritableMemChunk {
* subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5,
null, 5}
*/
void write(
- long[] times, Object valueList, Object bitMaps, TSDataType dataType, int
start, int end);
+ long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int
start, int end);
+
+ void writeVector(long[] times, String[] measurements, Object[] valueList,
BitMap[] bitMaps, int start, int end);
long count();
@@ -100,7 +102,6 @@ public interface IWritableMemChunk {
* served for flush requests. The logic is just same as
getSortedTVListForQuery, but without add
* reference count
*
- * @return sorted tv list
*/
void sortTvListForFlush();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
new file mode 100644
index 0000000..5b2e8aa
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
@@ -0,0 +1,267 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.VectorTVList;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VectorWritableMemChunk implements IWritableMemChunk {
+
+ private IMeasurementSchema schema;
+ private VectorTVList list;
+ private Map<String, Integer> VectorIdIndexMap;
+ private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(VectorWritableMemChunk.class);
+
+ public VectorWritableMemChunk(IMeasurementSchema schema) {
+ this.schema = schema;
+ VectorIdIndexMap = new HashMap<>();
+ for (int i = 0; i < schema.getSubMeasurementsCount(); i++) {
+ VectorIdIndexMap.put(schema.getSubMeasurementsList().get(i), i);
+ }
+ this.list =
TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList());
+ }
+
+ @Override
+ public void putLong(long t, long v) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putInt(long t, int v) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putFloat(long t, float v) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putDouble(long t, double v) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putBinary(long t, Binary v) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putBoolean(long t, boolean v) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putVector(long t, Object[] v) {
+ list.putVector(t, v);
+ }
+
+ @Override
+ public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int
end) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int
end) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int
end) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int
end) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void putVectors(long[] t, Object[] v, BitMap[] bitMaps, int start,
int end) {
+ list.putVectors(t, v, bitMaps, start, end);
+ }
+
+ @Override
+ public void write(long insertTime, Object objectValue) {
+ putVector(insertTime, (Object[]) objectValue);
+ }
+
+ @Override
+ public void write(long[] times, Object valueList, BitMap bitMap, TSDataType
dataType, int start,
+ int end) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
+ }
+
+ @Override
+ public void writeVector(long[] times, String[] measurementIds, Object[]
valueList, BitMap[] bitMaps, int start,
+ int end) {
+ checkColumnOrder(measurementIds);
+ putVectors(times, valueList, bitMaps, start, end);
+ }
+
+ private void checkColumnOrder(String[] measurementIds) {
+ // TODO HTHou
+ }
+
+ @Override
+ public long count() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public IMeasurementSchema getSchema() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TVList getSortedTvListForQuery() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TVList getSortedTvListForQuery(List<Integer> columnIndexList) {
+ sortTVList();
+ // increase reference count
+ list.increaseReferenceCount();
+ return list.getTvListByColumnIndex(columnIndexList);
+ }
+
+ private void sortTVList() {
+ // check reference count
+ if ((list.getReferenceCount() > 0 && !list.isSorted())) {
+ list = list.clone();
+ }
+
+ if (!list.isSorted()) {
+ list.sort();
+ }
+ }
+
+ @Override
+ public void sortTvListForFlush() {
+ sortTVList();
+ }
+
+ @Override
+ public int delete(long lowerBound, long upperBound) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int delete(long lowerBound, long upperBound, int columnIndex) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public IChunkWriter createIChunkWriter() {
+ return new VectorChunkWriterImpl(schema);
+ }
+
+ @Override
+ public void encode(IChunkWriter chunkWriter) {
+
+ List<Integer> timeDuplicatedVectorRowIndexList = null;
+ for (int sortedRowIndex = 0; sortedRowIndex < list.size();
sortedRowIndex++) {
+ long time = list.getTime(sortedRowIndex);
+
+ // skip duplicated data
+ if ((sortedRowIndex + 1 < list.size()
+ && (time == list.getTime(sortedRowIndex + 1)))) {
+ // record the time duplicated row index list for vector type
+ if (timeDuplicatedVectorRowIndexList == null) {
+ timeDuplicatedVectorRowIndexList = new ArrayList<>();
+
timeDuplicatedVectorRowIndexList.add(list.getValueIndex(sortedRowIndex));
+ }
+ timeDuplicatedVectorRowIndexList.add(list.getValueIndex(sortedRowIndex
+ 1));
+ continue;
+ }
+ List<TSDataType> dataTypes = list.getTsDataTypes();
+ int originRowIndex = list.getValueIndex(sortedRowIndex);
+ for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++)
{
+ // write the time duplicated rows
+ if (timeDuplicatedVectorRowIndexList != null
+ && !timeDuplicatedVectorRowIndexList.isEmpty()) {
+ originRowIndex =
+ list.getValidRowIndexForTimeDuplicatedRows(
+ timeDuplicatedVectorRowIndexList, columnIndex);
+ }
+ boolean isNull = list.isValueMarked(originRowIndex, columnIndex);
+ switch (dataTypes.get(columnIndex)) {
+ case BOOLEAN:
+ chunkWriter.write(
+ time,
+ list.getBooleanByValueIndex(originRowIndex, columnIndex),
+ isNull);
+ break;
+ case INT32:
+ chunkWriter.write(
+ time,
+ list.getIntByValueIndex(originRowIndex, columnIndex),
+ isNull);
+ break;
+ case INT64:
+ chunkWriter.write(
+ time,
+ list.getLongByValueIndex(originRowIndex, columnIndex),
+ isNull);
+ break;
+ case FLOAT:
+ chunkWriter.write(
+ time,
+ list.getFloatByValueIndex(originRowIndex, columnIndex),
+ isNull);
+ break;
+ case DOUBLE:
+ chunkWriter.write(
+ time,
+ list.getDoubleByValueIndex(originRowIndex, columnIndex),
+ isNull);
+ break;
+ case TEXT:
+ chunkWriter.write(
+ time,
+ list.getBinaryByValueIndex(originRowIndex, columnIndex),
+ isNull);
+ break;
+ default:
+ LOGGER.error(
+ "VectorWritableMemChunk does not support data type: {}",
+ dataTypes.get(columnIndex));
+ break;
+ }
+ }
+ chunkWriter.write(time);
+ timeDuplicatedVectorRowIndexList = null;
+ }
+
+ }
+
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 59e3d58..b6da0c0 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -71,41 +71,44 @@ public class WritableMemChunk implements IWritableMemChunk {
@Override
public void write(
- long[] times, Object valueList, Object bitMap, TSDataType dataType, int
start, int end) {
+ long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int
start, int end) {
switch (dataType) {
case BOOLEAN:
boolean[] boolValues = (boolean[]) valueList;
- putBooleans(times, boolValues, (BitMap) bitMap, start, end);
+ putBooleans(times, boolValues, bitMap, start, end);
break;
case INT32:
int[] intValues = (int[]) valueList;
- putInts(times, intValues, (BitMap) bitMap, start, end);
+ putInts(times, intValues, bitMap, start, end);
break;
case INT64:
long[] longValues = (long[]) valueList;
- putLongs(times, longValues, (BitMap) bitMap, start, end);
+ putLongs(times, longValues, bitMap, start, end);
break;
case FLOAT:
float[] floatValues = (float[]) valueList;
- putFloats(times, floatValues, (BitMap) bitMap, start, end);
+ putFloats(times, floatValues, bitMap, start, end);
break;
case DOUBLE:
double[] doubleValues = (double[]) valueList;
- putDoubles(times, doubleValues, (BitMap) bitMap, start, end);
+ putDoubles(times, doubleValues, bitMap, start, end);
break;
case TEXT:
Binary[] binaryValues = (Binary[]) valueList;
- putBinaries(times, binaryValues, (BitMap) bitMap, start, end);
- break;
- case VECTOR:
- Object[] vectorValues = (Object[]) valueList;
- putVectors(times, vectorValues, (BitMap[]) bitMap, start, end);
+ putBinaries(times, binaryValues, bitMap, start, end);
break;
default:
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType);
}
}
+
+ @Override
+ public void writeVector(long[] times, String[] measurements, Object[]
valueList, BitMap[] bitMaps,
+ int start, int end) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
list.getDataType());
+ }
+
@Override
public void putLong(long t, long v) {
list.putLong(t, v);
@@ -186,13 +189,7 @@ public class WritableMemChunk implements IWritableMemChunk
{
@Override
public synchronized TVList getSortedTvListForQuery(List<Integer>
columnIndexList) {
- if (list.getDataType() != TSDataType.VECTOR) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
list.getDataType());
- }
- sortTVList();
- // increase reference count
- list.increaseReferenceCount();
- return list.getTvListByColumnIndex(columnIndexList);
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
list.getDataType());
}
private void sortTVList() {
@@ -333,7 +330,5 @@ public class WritableMemChunk implements IWritableMemChunk {
break;
}
}
-
-
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 4318a5b..27bc389 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -232,7 +232,11 @@ public class TsFileProcessor {
}
}
- workMemTable.insert(insertRowPlan);
+ if (insertRowPlan.isAligned()) {
+ workMemTable.insertAlignedRow(insertRowPlan);
+ } else {
+ workMemTable.insert(insertRowPlan);
+ }
// update start time of this memtable
tsFileResource.updateStartTime(
@@ -298,7 +302,11 @@ public class TsFileProcessor {
}
try {
- workMemTable.insertTablet(insertTabletPlan, start, end);
+ if (insertTabletPlan.isAligned()) {
+ workMemTable.insertAlignedTablet(insertTabletPlan, start, end);
+ } else {
+ workMemTable.insertTablet(insertTabletPlan, start, end);
+ }
} catch (WriteProcessException e) {
for (int i = start; i < end; i++) {
results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index ee87de5..f2aa26e 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -66,7 +66,7 @@ public class PrimitiveMemTableTest {
TSDataType dataType = TSDataType.INT32;
WritableMemChunk series =
new WritableMemChunk(
- new UnaryMeasurementSchema("s1", dataType, TSEncoding.PLAIN),
TVList.newList(dataType));
+ new UnaryMeasurementSchema("s1", dataType, TSEncoding.PLAIN));
int count = 1000;
for (int i = 0; i < count; i++) {
series.write(i, i);
@@ -85,7 +85,7 @@ public class PrimitiveMemTableTest {
TSDataType dataType = TSDataType.INT32;
WritableMemChunk series =
new WritableMemChunk(
- new UnaryMeasurementSchema("s1", dataType, TSEncoding.PLAIN),
TVList.newList(dataType));
+ new UnaryMeasurementSchema("s1", dataType, TSEncoding.PLAIN));
int count = 100;
for (int i = 0; i < count; i++) {
series.write(i, i);