This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch new_vector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1c79368db9993780c126ff606cc03a659774a6aa Author: HTHou <[email protected]> AuthorDate: Fri Nov 5 14:21:01 2021 +0800 Support extend column --- .../engine/memtable/AlignedWritableMemChunk.java | 47 ++++++++++--------- .../db/engine/querycontext/ReadOnlyMemChunk.java | 2 +- .../db/engine/storagegroup/TsFileProcessor.java | 53 +++++++++++++++++----- .../apache/iotdb/db/metadata/path/AlignedPath.java | 6 +-- .../db/utils/datastructure/AlignedTVList.java | 48 +++++++++++--------- .../iotdb/db/utils/datastructure/TVList.java | 2 +- .../write/schema/VectorMeasurementSchema.java | 16 +++++++ 7 files changed, 116 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java index 1315a4c..90875a3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java @@ -5,6 +5,7 @@ import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; @@ -16,29 +17,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class AlignedWritableMemChunk implements IWritableMemChunk { - private IMeasurementSchema schema; + private VectorMeasurementSchema schema; private AlignedTVList list; - private Map<String, Integer> alignedMeasurementIndexMap; private static final String UNSUPPORTED_TYPE = "Unsupported data type:"; private static final Logger LOGGER = LoggerFactory.getLogger(AlignedWritableMemChunk.class); public AlignedWritableMemChunk(VectorMeasurementSchema schema) { this.schema = schema; - alignedMeasurementIndexMap = new HashMap<>(); - for (int i = 0; i < schema.getSubMeasurementsCount(); i++) { - alignedMeasurementIndexMap.put(schema.getSubMeasurementsList().get(i), i); - } this.list = TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList()); } public boolean containsMeasurement(String measurementId) { - return alignedMeasurementIndexMap.containsKey(measurementId); + return schema.containsSubMeasurement(measurementId); } @Override @@ -72,8 +66,8 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { } @Override - public void putAlignedValue(long t, Object[] v, int[] columnOrder) { - list.putAlignedValue(t, v, columnOrder); + public void putAlignedValue(long t, Object[] v, int[] columnIndexArray) { + list.putAlignedValue(t, v, columnIndexArray); } @Override @@ -108,8 +102,8 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public void putAlignedValues( - long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndex, int start, int end) { - list.putAlignedValues(t, v, bitMaps, columnIndex, start, end); + long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) { + list.putAlignedValues(t, v, bitMaps, columnIndexArray, start, end); } @Override @@ -119,7 +113,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) { - int[] columnIndexArray = checkColumnOrder(schema); + int[] columnIndexArray = checkColumnsInInsertPlan(schema); putAlignedValue(insertTime, objectValue, columnIndexArray); } @@ -137,19 +131,28 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { IMeasurementSchema schema, int start, int end) { - int[] columnIndexArray = checkColumnOrder(schema); + int[] columnIndexArray = checkColumnsInInsertPlan(schema); putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end); } - private int[] checkColumnOrder(IMeasurementSchema schema) { + private int[] checkColumnsInInsertPlan(IMeasurementSchema schema) { VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema; List<String> measurementIdsInInsertPlan = vectorSchema.getSubMeasurementsList(); + List<TSDataType> dataTypesInInsertPlan = vectorSchema.getSubMeasurementsTSDataTypeList(); + List<TSEncoding> encodingsInInsertPlan = vectorSchema.getSubMeasurementsTSEncodingList(); List<String> measurementIdsInTVList = ((VectorMeasurementSchema) this.schema).getSubMeasurementsList(); int[] columnIndexArray = new int[measurementIdsInTVList.size()]; for (int i = 0; i < columnIndexArray.length; i++) { columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementIdsInTVList.get(i)); } + for (int i = 0; i < measurementIdsInInsertPlan.size(); i++) { + if (!containsMeasurement(measurementIdsInInsertPlan.get(i))) { + this.schema.addSubMeasurement(measurementIdsInInsertPlan.get(i), dataTypesInInsertPlan.get(i), encodingsInInsertPlan.get(i)); + this.list.extendColumn(dataTypesInInsertPlan.get(i)); + + } + } return columnIndexArray; } @@ -160,7 +163,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public long count() { - return list.size() * alignedMeasurementIndexMap.size(); + return list.size() * schema.getSubMeasurementsCount(); + } + + public long alignedListSize() { + return list.size(); } @Override @@ -182,13 +189,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { // increase reference count list.increaseReferenceCount(); List<Integer> columnIndexList = new ArrayList<>(); - List<TSDataType> dataTypeList = new ArrayList<>(); for (IMeasurementSchema measurementSchema : schemaList) { columnIndexList.add( - alignedMeasurementIndexMap.getOrDefault(measurementSchema.getMeasurementId(), -1)); - dataTypeList.add(measurementSchema.getType()); + schema.getSubMeasurementIndex(measurementSchema.getMeasurementId())); } - return list.getTvListByColumnIndex(columnIndexList, dataTypeList); + return list.getTvListByColumnIndex(columnIndexList); } private void sortTVList() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java index f0fdbeb..26492a3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java @@ -210,7 +210,7 @@ public class ReadOnlyMemChunk { IMeasurementSchema schema, Statistics[] valueStatistics, TimeValuePair timeValuePair) throws QueryProcessException { for (int i = 0; i < schema.getSubMeasurementsTSDataTypeList().size(); i++) { - if (timeValuePair.getValue().getVector()[i] == null) { + if (timeValuePair.getValue().getVector() == null || timeValuePair.getValue().getVector()[i] == null) { continue; } switch (schema.getSubMeasurementsTSDataTypeList().get(i)) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index b50c904..80fb3c3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.flush.FlushListener; import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.flush.MemTableFlushTask; import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable; +import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; import org.apache.iotdb.db.engine.modification.Deletion; @@ -39,6 +40,7 @@ import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; @@ -378,26 +380,37 @@ public class TsFileProcessor { long memTableIncrement = 0L; long textDataIncrement = 0L; long chunkMetadataIncrement = 0L; + AlignedWritableMemChunk vectorMemChunk = null; String deviceId = insertRowPlan.getPrefixPath().getFullPath(); - if (workMemTable.checkIfChunkDoesNotExist(deviceId, null)) { + if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) { // ChunkMetadataIncrement chunkMetadataIncrement += - ChunkMetadata.calculateRamSize( - insertRowPlan.getMeasurements()[0], insertRowPlan.getDataTypes()[0]); + ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) + * insertRowPlan.getDataTypes().length; memTableIncrement += AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes()); } else { // here currentChunkPointNum >= 1 - int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, null); + int currentChunkPointNum = + workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER); memTableIncrement += (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0 ? AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes()) : 0; + vectorMemChunk = + ((AlignedWritableMemChunk) + workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER)); } for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) { // skip failed Measurements if (insertRowPlan.getDataTypes()[i] == null || insertRowPlan.getMeasurements()[i] == null) { continue; } + // extending the column of aligned mem chunk + if (vectorMemChunk != null && !vectorMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) { + memTableIncrement += + (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) + * insertRowPlan.getDataTypes()[i].getDataTypeSize(); + } // TEXT data mem size if (insertRowPlan.getDataTypes()[i] == TSDataType.TEXT) { textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]); @@ -445,7 +458,7 @@ public class TsFileProcessor { updateAlignedMemCost( insertTabletPlan.getDataTypes(), deviceId, - insertTabletPlan.getMeasurements()[0], + insertTabletPlan.getMeasurements(), start, end, memIncrements, @@ -497,21 +510,24 @@ public class TsFileProcessor { private void updateAlignedMemCost( TSDataType[] dataTypes, String deviceId, - String measurementId, + String[] measurementIds, int start, int end, long[] memIncrements, Object[] columns) { + AlignedWritableMemChunk vectorMemChunk = null; // memIncrements = [memTable, text, chunk metadata] respectively - if (workMemTable.checkIfChunkDoesNotExist(deviceId, null)) { + if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) { // ChunkMetadataIncrement memIncrements[2] += - dataTypes.length * ChunkMetadata.calculateRamSize(measurementId, dataTypes[0]); + dataTypes.length + * ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR); memIncrements[0] += ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) * AlignedTVList.alignedTvListArrayMemSize(dataTypes); } else { - int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, null); + int currentChunkPointNum = + workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER); if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) { memIncrements[0] += ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) @@ -525,10 +541,25 @@ public class TsFileProcessor { ? 0 : acquireArray * AlignedTVList.alignedTvListArrayMemSize(dataTypes); } + vectorMemChunk = + ((AlignedWritableMemChunk) + workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER)); } - // TEXT data size for (int i = 0; i < dataTypes.length; i++) { - if (dataTypes[i] == TSDataType.TEXT) { + TSDataType dataType = dataTypes[i]; + String measurement = measurementIds[i]; + Object column = columns[i]; + if (dataType == null || column == null || measurement == null) { + continue; + } + // extending the column of aligned mem chunk + if (vectorMemChunk != null && !vectorMemChunk.containsMeasurement(measurementIds[i])) { + memIncrements[0] += + (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) + * dataType.getDataTypeSize(); + } + // TEXT data size + if (dataType == TSDataType.TEXT) { Binary[] binColumn = (Binary[]) columns[i]; memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end); } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java index 43397ba..5a0a68e 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java @@ -334,11 +334,11 @@ public class AlignedPath extends PartialPath { if (!memTableMap.containsKey(getDevice())) { return null; } - AlignedWritableMemChunk vectorMemChunk = + AlignedWritableMemChunk alignedMemChunk = ((AlignedWritableMemChunk) memTableMap.get(getDevice()).get(VECTOR_PLACEHOLDER)); boolean containsMeasurement = false; for (String measurement : measurementList) { - if (vectorMemChunk.containsMeasurement(measurement)) { + if (alignedMemChunk.containsMeasurement(measurement)) { containsMeasurement = true; break; } @@ -347,7 +347,7 @@ public class AlignedPath extends PartialPath { return null; } // get sorted tv list is synchronized so different query can get right sorted list reference - TVList vectorTvListCopy = vectorMemChunk.getSortedTvListForQuery(schemaList); + TVList vectorTvListCopy = alignedMemChunk.getSortedTvListForQuery(schemaList); int curSize = vectorTvListCopy.size(); return new ReadOnlyMemChunk(getMeasurementSchema(), vectorTvListCopy, curSize, deletionList); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index a3f959f..f2a5d76 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -168,15 +168,16 @@ public class AlignedTVList extends TVList { TsPrimitiveType[] vector = new TsPrimitiveType[values.size()]; for (int columnIndex = 0; columnIndex < values.size(); columnIndex++) { List<Object> columnValues = values.get(columnIndex); - if (validIndexesForTimeDuplicatedRows != null) { - arrayIndex = validIndexesForTimeDuplicatedRows[columnIndex] / ARRAY_SIZE; - elementIndex = validIndexesForTimeDuplicatedRows[columnIndex] % ARRAY_SIZE; - } - if (bitMaps != null + if (columnValues == null + || bitMaps != null && bitMaps.get(columnIndex) != null && isValueMarked(valueIndex, columnIndex)) { continue; } + if (validIndexesForTimeDuplicatedRows != null) { + arrayIndex = validIndexesForTimeDuplicatedRows[columnIndex] / ARRAY_SIZE; + elementIndex = validIndexesForTimeDuplicatedRows[columnIndex] % ARRAY_SIZE; + } switch (dataTypes.get(columnIndex)) { case TEXT: Binary valueT = ((Binary[]) columnValues.get(arrayIndex))[elementIndex]; @@ -222,22 +223,15 @@ public class AlignedTVList extends TVList { } @Override - public TVList getTvListByColumnIndex(List<Integer> columnIndex, List<TSDataType> dataTypes) { + public TVList getTvListByColumnIndex(List<Integer> columnIndex) { List<TSDataType> types = new ArrayList<>(); List<List<Object>> values = new ArrayList<>(); List<List<BitMap>> bitMaps = null; for (int i = 0; i < columnIndex.size(); i++) { - // columnIndex == -1 means querying a non-exist column, generate empty column here + // columnIndex == -1 means querying a non-exist column, add null column here if (columnIndex.get(i) == -1) { - types.add(dataTypes.get(i)); - // use bitmap to mark as null value - if (bitMaps == null) { - bitMaps = new ArrayList<>(columnIndex.size()); - for (int j = 0; j < columnIndex.size(); j++) { - bitMaps.add(null); - } - } - generateEmptyColumn(dataTypes.get(i), values, bitMaps); + types.add(null); + values.add(null); } else { types.add(this.dataTypes.get(columnIndex.get(i))); values.add(this.values.get(columnIndex.get(i))); @@ -261,8 +255,13 @@ public class AlignedTVList extends TVList { return alignedTvList; } - private void generateEmptyColumn( - TSDataType dataType, List<List<Object>> values, List<List<BitMap>> bitMaps) { + public void extendColumn(TSDataType dataType) { + if (bitMaps == null) { + bitMaps = new ArrayList<>(values.size() + 1); + for (int i = 0; i < values.size() + 1; i++) { + bitMaps.add(null); + } + } List<Object> columnValue = new ArrayList<>(); List<BitMap> columnBitMaps = new ArrayList<>(); for (int i = 0; i < timestamps.size(); i++) { @@ -289,12 +288,19 @@ public class AlignedTVList extends TVList { break; } BitMap bitMap = new BitMap(ARRAY_SIZE); - bitMap.markAll(); + // last bitmap should be marked to the tslist size's position + if (i == timestamps.size()) { + for (int j = 0; j < size % ARRAY_SIZE; j++) { + bitMap.mark(j); + } + } else { + bitMap.markAll(); + } columnBitMaps.add(bitMap); } // values.size() is the index of column - bitMaps.set(values.size(), columnBitMaps); - values.add(columnValue); + this.bitMaps.set(values.size(), columnBitMaps); + this.values.add(columnValue); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 25c6aa2..8f1536b 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -207,7 +207,7 @@ public abstract class TVList { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } - public TVList getTvListByColumnIndex(List<Integer> columnIndexList, List<TSDataType> dataTypes) { + public TVList getTvListByColumnIndex(List<Integer> columnIndexList) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java index d2e97cd..fb4a75f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java @@ -223,6 +223,22 @@ public class VectorMeasurementSchema return subMeasurementsToIndexMap.containsKey(subMeasurement); } + public void addSubMeasurement(String measurementId, TSDataType dataType, TSEncoding encoding) { + subMeasurementsToIndexMap.put(measurementId, subMeasurementsToIndexMap.size()); + byte[] typesInByte = new byte[subMeasurementsToIndexMap.size()]; + for (int i = 0; i < subMeasurementsToIndexMap.size(); i++) { + typesInByte[i] = types[i]; + } + typesInByte[typesInByte.length - 1] = dataType.serialize(); + this.types = typesInByte; + byte[] encodingsInByte = new byte[subMeasurementsToIndexMap.size()]; + for (int i = 0; i < subMeasurementsToIndexMap.size(); i++) { + encodingsInByte[i] = encodings[i]; + } + encodingsInByte[encodingsInByte.length - 1] = encoding.serialize(); + this.encodings = encodingsInByte; + } + @Override public int serializeTo(ByteBuffer buffer) { int byteLen = 0;
