This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_vector by this push:
new b98ad1f implement VectorWritableMemChunk
b98ad1f is described below
commit b98ad1f3048362080e10e51b60fd6166f0f6e38d
Author: HTHou <[email protected]>
AuthorDate: Thu Oct 28 12:27:53 2021 +0800
implement VectorWritableMemChunk
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 114 +--------------------
.../db/engine/memtable/IWritableMemChunk.java | 2 +-
.../db/engine/memtable/PrimitiveMemTable.java | 7 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 67 ++++++++++--
.../apache/iotdb/db/rescon/TVListAllocator.java | 3 +-
.../iotdb/db/utils/datastructure/TVList.java | 2 +-
6 files changed, 64 insertions(+), 131 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 3c29135..30e878a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -162,118 +162,6 @@ public class MemTableFlushTask {
/** encoding task (second task of pipeline) */
private Runnable encodingTask =
new Runnable() {
- private void writeOneSeries(
- TVList tvPairs, IChunkWriter seriesWriterImpl, TSDataType
dataType) {
- List<Integer> timeDuplicatedVectorRowIndexList = null;
- for (int sortedRowIndex = 0; sortedRowIndex < tvPairs.size();
sortedRowIndex++) {
- long time = tvPairs.getTime(sortedRowIndex);
-
- // skip duplicated data
- if ((sortedRowIndex + 1 < tvPairs.size()
- && (time == tvPairs.getTime(sortedRowIndex + 1)))) {
- // record the time duplicated row index list for vector type
- if (dataType == TSDataType.VECTOR) {
- if (timeDuplicatedVectorRowIndexList == null) {
- timeDuplicatedVectorRowIndexList = new ArrayList<>();
-
timeDuplicatedVectorRowIndexList.add(tvPairs.getValueIndex(sortedRowIndex));
- }
-
timeDuplicatedVectorRowIndexList.add(tvPairs.getValueIndex(sortedRowIndex + 1));
- }
- continue;
- }
-
- // store last point for SDT
- if (dataType != TSDataType.VECTOR && sortedRowIndex + 1 ==
tvPairs.size()) {
- ((ChunkWriterImpl) seriesWriterImpl).setLastPoint(true);
- }
-
- switch (dataType) {
- case BOOLEAN:
- seriesWriterImpl.write(time,
tvPairs.getBoolean(sortedRowIndex), false);
- break;
- case INT32:
- seriesWriterImpl.write(time, tvPairs.getInt(sortedRowIndex),
false);
- break;
- case INT64:
- seriesWriterImpl.write(time, tvPairs.getLong(sortedRowIndex),
false);
- break;
- case FLOAT:
- seriesWriterImpl.write(time, tvPairs.getFloat(sortedRowIndex),
false);
- break;
- case DOUBLE:
- seriesWriterImpl.write(time,
tvPairs.getDouble(sortedRowIndex), false);
- break;
- case TEXT:
- seriesWriterImpl.write(time,
tvPairs.getBinary(sortedRowIndex), false);
- break;
- case VECTOR:
- VectorTVList vectorTvPairs = (VectorTVList) tvPairs;
- List<TSDataType> dataTypes = vectorTvPairs.getTsDataTypes();
- int originRowIndex =
vectorTvPairs.getValueIndex(sortedRowIndex);
- for (int columnIndex = 0; columnIndex < dataTypes.size();
columnIndex++) {
- // write the time duplicated rows
- if (timeDuplicatedVectorRowIndexList != null
- && !timeDuplicatedVectorRowIndexList.isEmpty()) {
- originRowIndex =
- vectorTvPairs.getValidRowIndexForTimeDuplicatedRows(
- timeDuplicatedVectorRowIndexList, columnIndex);
- }
- boolean isNull = vectorTvPairs.isValueMarked(originRowIndex,
columnIndex);
- switch (dataTypes.get(columnIndex)) {
- case BOOLEAN:
- seriesWriterImpl.write(
- time,
- vectorTvPairs.getBooleanByValueIndex(originRowIndex,
columnIndex),
- isNull);
- break;
- case INT32:
- seriesWriterImpl.write(
- time,
- vectorTvPairs.getIntByValueIndex(originRowIndex,
columnIndex),
- isNull);
- break;
- case INT64:
- seriesWriterImpl.write(
- time,
- vectorTvPairs.getLongByValueIndex(originRowIndex,
columnIndex),
- isNull);
- break;
- case FLOAT:
- seriesWriterImpl.write(
- time,
- vectorTvPairs.getFloatByValueIndex(originRowIndex,
columnIndex),
- isNull);
- break;
- case DOUBLE:
- seriesWriterImpl.write(
- time,
- vectorTvPairs.getDoubleByValueIndex(originRowIndex,
columnIndex),
- isNull);
- break;
- case TEXT:
- seriesWriterImpl.write(
- time,
- vectorTvPairs.getBinaryByValueIndex(originRowIndex,
columnIndex),
- isNull);
- break;
- default:
- LOGGER.error(
- "Storage group {} does not support data type: {}",
- storageGroup,
- dataTypes.get(columnIndex));
- break;
- }
- }
- seriesWriterImpl.write(time);
- timeDuplicatedVectorRowIndexList = null;
- break;
- default:
- LOGGER.error(
- "Storage group {} does not support data type: {}",
storageGroup, dataType);
- break;
- }
- }
- }
@SuppressWarnings("squid:S135")
@Override
@@ -311,7 +199,7 @@ public class MemTableFlushTask {
} else {
long starTime = System.currentTimeMillis();
IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
- IChunkWriter seriesWriter = writableMemChunk.createIChunkWrite();
+ IChunkWriter seriesWriter =
writableMemChunk.createIChunkWriter();
writableMemChunk.encode(seriesWriter);
seriesWriter.sealCurrentPage();
seriesWriter.clearPageWriter();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index d6a3862..4e96d88 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -118,7 +118,7 @@ public interface IWritableMemChunk {
// For delete one column in the vector
int delete(long lowerBound, long upperBound, int columnIndex);
- IChunkWriter createIChunkWrite();
+ IChunkWriter createIChunkWriter();
void encode(IChunkWriter chunkWriter);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 922cb12..2b85280 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.engine.memtable;
-import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -41,11 +40,9 @@ public class PrimitiveMemTable extends AbstractMemTable {
@Override
protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
if (schema.getType() == TSDataType.VECTOR) {
- return new WritableMemChunk(
- schema,
-
TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList()));
+ return new VectorWritableMemChunk(schema);
}
- return new WritableMemChunk(schema,
TVListAllocator.getInstance().allocate(schema.getType()));
+ return new WritableMemChunk(schema);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index f7d359e..59e3d58 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -18,14 +18,17 @@
*/
package org.apache.iotdb.db.engine.memtable;
+import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
public class WritableMemChunk implements IWritableMemChunk {
@@ -33,10 +36,11 @@ public class WritableMemChunk implements IWritableMemChunk {
private IMeasurementSchema schema;
private TVList list;
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WritableMemChunk.class);
- public WritableMemChunk(IMeasurementSchema schema, TVList list) {
+ public WritableMemChunk(IMeasurementSchema schema) {
this.schema = schema;
- this.list = list;
+ this.list = TVListAllocator.getInstance().allocate(schema.getType());
}
@Override
@@ -60,9 +64,6 @@ public class WritableMemChunk implements IWritableMemChunk {
case TEXT:
putBinary(insertTime, (Binary) objectValue);
break;
- case VECTOR:
- putVector(insertTime, (Object[]) objectValue);
- break;
default:
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
}
@@ -137,7 +138,7 @@ public class WritableMemChunk implements IWritableMemChunk {
@Override
public void putVector(long t, Object[] v) {
- list.putVector(t, v);
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
}
@Override
@@ -172,7 +173,7 @@ public class WritableMemChunk implements IWritableMemChunk {
@Override
public void putVectors(long[] t, Object[] v, BitMap[] bitMaps, int start,
int end) {
- list.putVectors(t, v, bitMaps, start, end);
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
}
@Override
@@ -254,8 +255,8 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public IChunkWriter createIChunkWrite() {
- return null;
+ public IChunkWriter createIChunkWriter() {
+ return new ChunkWriterImpl(schema);
}
@Override
@@ -289,4 +290,50 @@ public class WritableMemChunk implements IWritableMemChunk
{
}
return out.toString();
}
+
+ @Override
+ public void encode(IChunkWriter chunkWriter) {
+
+ for (int sortedRowIndex = 0; sortedRowIndex < list.size();
sortedRowIndex++) {
+ long time = list.getTime(sortedRowIndex);
+
+ // skip duplicated data
+ if ((sortedRowIndex + 1 < list.size()
+ && (time == list.getTime(sortedRowIndex + 1)))) {
+ continue;
+ }
+
+ // store last point for SDT
+ if (sortedRowIndex + 1 == list.size()) {
+ ((ChunkWriterImpl) chunkWriter).setLastPoint(true);
+ }
+
+ switch (schema.getType()) {
+ case BOOLEAN:
+ chunkWriter.write(time, list.getBoolean(sortedRowIndex), false);
+ break;
+ case INT32:
+ chunkWriter.write(time, list.getInt(sortedRowIndex), false);
+ break;
+ case INT64:
+ chunkWriter.write(time, list.getLong(sortedRowIndex), false);
+ break;
+ case FLOAT:
+ chunkWriter.write(time, list.getFloat(sortedRowIndex), false);
+ break;
+ case DOUBLE:
+ chunkWriter.write(time, list.getDouble(sortedRowIndex), false);
+ break;
+ case TEXT:
+ chunkWriter.write(time, list.getBinary(sortedRowIndex), false);
+ break;
+ default:
+ LOGGER.error(
+ "WritableMemChunk does not support data type: {}",
schema.getType());
+ break;
+ }
+ }
+
+
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
index f2e54ab..fab5fab 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.VectorTVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayDeque;
@@ -56,7 +57,7 @@ public class TVListAllocator implements TVListAllocatorMBean,
IService {
return list != null ? list : TVList.newList(dataType);
}
- public synchronized TVList allocate(List<TSDataType> dataTypes) {
+ public synchronized VectorTVList allocate(List<TSDataType> dataTypes) {
return TVList.newVectorList(dataTypes);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 3e78339..796b6e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -82,7 +82,7 @@ public abstract class TVList {
return null;
}
- public static TVList newVectorList(List<TSDataType> datatypes) {
+ public static VectorTVList newVectorList(List<TSDataType> datatypes) {
return new VectorTVList(datatypes);
}