This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch IWritableMemChunkGroup in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0cd21e12749228eea857e70829ef305cac412ee2 Author: HTHou <[email protected]> AuthorDate: Wed Nov 17 11:20:01 2021 +0800 Introduce IWritableMemChunkGroup --- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +- .../iotdb/db/engine/flush/MemTableFlushTask.java | 5 +- .../iotdb/db/engine/flush/NotifyFlushMemTable.java | 12 -- .../iotdb/db/engine/memtable/AbstractMemTable.java | 237 +++++++++------------ .../engine/memtable/AlignedWritableMemChunk.java | 104 +++++---- .../memtable/AlignedWritableMemChunkGroup.java | 62 ++++++ .../apache/iotdb/db/engine/memtable/IMemTable.java | 9 +- .../db/engine/memtable/IWritableMemChunk.java | 7 +- .../db/engine/memtable/IWritableMemChunkGroup.java | 30 +++ .../db/engine/memtable/PrimitiveMemTable.java | 17 +- .../iotdb/db/engine/memtable/WritableMemChunk.java | 12 +- .../db/engine/memtable/WritableMemChunkGroup.java | 99 +++++++++ .../db/engine/storagegroup/TsFileProcessor.java | 29 +-- .../apache/iotdb/db/metadata/path/AlignedPath.java | 7 +- .../iotdb/db/metadata/path/MeasurementPath.java | 8 +- .../apache/iotdb/db/metadata/path/PartialPath.java | 4 +- .../iotdb/db/qp/physical/crud/InsertRowPlan.java | 6 +- .../java/org/apache/iotdb/db/utils/MemUtils.java | 15 +- .../iotdb/db/writelog/recover/LogReplayer.java | 22 +- .../db/engine/memtable/MemTableTestUtils.java | 6 +- .../db/engine/memtable/MemtableBenchmark.java | 7 +- .../db/engine/memtable/PrimitiveMemTableTest.java | 14 +- .../tsfile/write/chunk/AlignedChunkWriterImpl.java | 28 +++ 23 files changed, 481 insertions(+), 268 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 17deb09..1d0ae09 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1281,13 +1281,10 @@ public class IoTDBDescriptor { + queryMemoryAllocateProportion); } } - } - conf.setMaxQueryDeduplicatedPathNum( - Integer.parseInt( - properties.getProperty( - "max_deduplicated_path_num", - Integer.toString(conf.getMaxQueryDeduplicatedPathNum())))); + conf.setMaxQueryDeduplicatedPathNum( + Integer.parseInt(properties.getProperty("max_deduplicated_path_num"))); + } } @SuppressWarnings("squid:S3518") // "proportionSum" can't be zero diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index 957d03e..ecbdac8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; +import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.exception.runtime.FlushRunTimeException; import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; @@ -101,11 +102,11 @@ public class MemTableFlushTask { long sortTime = 0; // for map do not use get(key) to iterate - for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : + for (Map.Entry<String, IWritableMemChunkGroup> memTableEntry : memTable.getMemTableMap().entrySet()) { encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey())); - final Map<String, IWritableMemChunk> value = memTableEntry.getValue(); + final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap(); for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) { long startTime = System.currentTimeMillis(); IWritableMemChunk series = iWritableMemChunkEntry.getValue(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java index 169c22f..8d80b8a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java @@ -20,8 +20,6 @@ package org.apache.iotdb.db.engine.flush; import org.apache.iotdb.db.engine.memtable.AbstractMemTable; import org.apache.iotdb.db.engine.memtable.IMemTable; -import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; -import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; /** * Only used in sync flush and async close to start a flush task This memtable is not managed by @@ -30,16 +28,6 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; public class NotifyFlushMemTable extends AbstractMemTable { @Override - protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) { - return null; - } - - @Override - protected IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema) { - return null; - } - - @Override public IMemTable copy() { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 72c9b72..c5e8ae2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -22,20 +22,13 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; -import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; -import org.apache.iotdb.db.rescon.TVListAllocator; import org.apache.iotdb.db.utils.MemUtils; -import org.apache.iotdb.db.utils.datastructure.TVList; -import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; -import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; import java.io.IOException; import java.util.ArrayList; @@ -47,7 +40,7 @@ import java.util.Map.Entry; public abstract class AbstractMemTable implements IMemTable { - private final Map<String, Map<String, IWritableMemChunk>> memTableMap; + private final Map<String, IWritableMemChunkGroup> memTableMap; /** * The initial value is true because we want calculate the text data size when recover memTable!! */ @@ -80,12 +73,12 @@ public abstract class AbstractMemTable implements IMemTable { this.memTableMap = new HashMap<>(); } - public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) { + public AbstractMemTable(Map<String, IWritableMemChunkGroup> memTableMap) { this.memTableMap = memTableMap; } @Override - public Map<String, Map<String, IWritableMemChunk>> getMemTableMap() { + public Map<String, IWritableMemChunkGroup> getMemTableMap() { return memTableMap; } @@ -93,63 +86,64 @@ public abstract class AbstractMemTable implements IMemTable { * create this MemChunk if it's not exist * * @param deviceId device id - * @param schema measurement schema - * @return this MemChunk + * @param schemaList measurement schemaList + * @return this MemChunkGroup */ - private IWritableMemChunk createMemChunkIfNotExistAndGet( - String deviceId, IMeasurementSchema schema) { - Map<String, IWritableMemChunk> memSeries = - memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>()); - - return memSeries.computeIfAbsent( - schema.getMeasurementId(), - k -> { - seriesNumber++; - totalPointsNumThreshold += avgSeriesPointNumThreshold; - return genMemSeries(schema); - }); - } - - private IWritableMemChunk createAlignedMemChunkIfNotExistAndGet( - String deviceId, IMeasurementSchema schema) { - Map<String, IWritableMemChunk> memSeries = - memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>()); - - VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema; - return memSeries.computeIfAbsent( - vectorSchema.getMeasurementId(), - k -> { - seriesNumber++; - totalPointsNumThreshold += - avgSeriesPointNumThreshold * vectorSchema.getSubMeasurementsCount(); - return genAlignedMemSeries(vectorSchema); - }); + private IWritableMemChunkGroup createMemChunkGroupIfNotExistAndGet( + String deviceId, List<IMeasurementSchema> schemaList) { + IWritableMemChunkGroup memChunkGroup = + memTableMap.computeIfAbsent( + deviceId, + k -> { + seriesNumber++; + totalPointsNumThreshold += avgSeriesPointNumThreshold; + return new WritableMemChunkGroup(schemaList); + }); + for (IMeasurementSchema schema : schemaList) { + if (!memChunkGroup.contains(schema.getMeasurementId())) { + seriesNumber++; + totalPointsNumThreshold += avgSeriesPointNumThreshold; + } + } + return memChunkGroup; + } + + private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet( + String deviceId, List<IMeasurementSchema> schemaList) { + IWritableMemChunkGroup memChunkGroup = + memTableMap.computeIfAbsent( + deviceId, + k -> { + seriesNumber++; + totalPointsNumThreshold += avgSeriesPointNumThreshold; + return new AlignedWritableMemChunkGroup(schemaList); + }); + for (IMeasurementSchema schema : schemaList) { + if (!memChunkGroup.contains(schema.getMeasurementId())) { + seriesNumber++; + totalPointsNumThreshold += avgSeriesPointNumThreshold; + } + } + return memChunkGroup; } - protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema); - - protected abstract IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema); - @Override public void insert(InsertRowPlan insertRowPlan) { updatePlanIndexes(insertRowPlan.getIndex()); Object[] values = insertRowPlan.getValues(); - IMeasurementMNode[] measurementMNodes = insertRowPlan.getMeasurementMNodes(); - for (int i = 0; i < measurementMNodes.length; i++) { + List<IMeasurementSchema> schemaList = new ArrayList<>(); + List<TSDataType> dataTypes = new ArrayList<>(); + for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) { if (values[i] == null) { continue; } - memSize += - MemUtils.getRecordSize( - measurementMNodes[i].getSchema().getType(), values[i], disableMemControl); - - write( - insertRowPlan.getDeviceId().getFullPath(), - measurementMNodes[i].getSchema(), - insertRowPlan.getTime(), - values[i]); + IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema(); + schemaList.add(schema); + dataTypes.add(schema.getType()); } + memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl); + write(insertRowPlan.getDeviceId().getFullPath(), schemaList, insertRowPlan.getTime(), values); totalPointsNum += insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber(); } @@ -158,34 +152,24 @@ public abstract class AbstractMemTable implements IMemTable { public void insertAlignedRow(InsertRowPlan insertRowPlan) { updatePlanIndexes(insertRowPlan.getIndex()); // write vector - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - List<TSEncoding> encodings = new ArrayList<>(); - CompressionType compressionType = null; + List<IMeasurementSchema> schemaList = new ArrayList<>(); + List<TSDataType> dataTypes = new ArrayList<>(); for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) { - if (insertRowPlan.getMeasurements()[i] == null) { + if (insertRowPlan.getValues()[i] == null) { continue; } IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema(); - measurements.add(schema.getMeasurementId()); - types.add(schema.getType()); - encodings.add(schema.getEncodingType()); - compressionType = schema.getCompressor(); + schemaList.add(schema); + dataTypes.add(schema.getType()); } - if (measurements.isEmpty()) { + if (schemaList.isEmpty()) { return; } - VectorMeasurementSchema vectorSchema = - new VectorMeasurementSchema( - AlignedPath.VECTOR_PLACEHOLDER, - measurements.toArray(new String[measurements.size()]), - types.toArray(new TSDataType[measurements.size()]), - encodings.toArray(new TSEncoding[measurements.size()]), - compressionType); - memSize += MemUtils.getAlignedRecordSize(types, insertRowPlan.getValues(), disableMemControl); + memSize += + MemUtils.getAlignedRecordsSize(dataTypes, insertRowPlan.getValues(), disableMemControl); writeAlignedRow( insertRowPlan.getDeviceId().getFullPath(), - vectorSchema, + schemaList, insertRowPlan.getTime(), insertRowPlan.getValues()); totalPointsNum += @@ -224,92 +208,81 @@ public abstract class AbstractMemTable implements IMemTable { @Override public void write( - String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue) { - IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(deviceId, schema); - memSeries.write(insertTime, objectValue); + String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue) { + IWritableMemChunkGroup memChunkGroup = + createMemChunkGroupIfNotExistAndGet(deviceId, schemaList); + memChunkGroup.write(insertTime, objectValue, schemaList); } @Override public void writeAlignedRow( - String deviceId, IMeasurementSchema schema, long insertTime, Object[] objectValue) { - IWritableMemChunk memSeries = createAlignedMemChunkIfNotExistAndGet(deviceId, schema); - memSeries.writeAlignedValue(insertTime, objectValue, schema); + String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue) { + IWritableMemChunkGroup memChunkGroup = + createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList); + memChunkGroup.write(insertTime, objectValue, schemaList); } @SuppressWarnings("squid:S3776") // high Cognitive Complexity @Override public void write(InsertTabletPlan insertTabletPlan, int start, int end) { - updatePlanIndexes(insertTabletPlan.getIndex()); + List<IMeasurementSchema> schemaList = new ArrayList<>(); for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) { if (insertTabletPlan.getColumns()[i] == null) { continue; } - IWritableMemChunk memSeries = - createMemChunkIfNotExistAndGet( - insertTabletPlan.getDeviceId().getFullPath(), - insertTabletPlan.getMeasurementMNodes()[i].getSchema()); - memSeries.write( - insertTabletPlan.getTimes(), - insertTabletPlan.getColumns()[i], - insertTabletPlan.getBitMaps() != null ? insertTabletPlan.getBitMaps()[i] : null, - insertTabletPlan.getDataTypes()[i], - start, - end); + IMeasurementSchema schema = insertTabletPlan.getMeasurementMNodes()[i].getSchema(); + schemaList.add(schema); } + IWritableMemChunkGroup memChunkGroup = + createMemChunkGroupIfNotExistAndGet( + insertTabletPlan.getDeviceId().getFullPath(), schemaList); + memChunkGroup.writeValues( + insertTabletPlan.getTimes(), + insertTabletPlan.getColumns(), + insertTabletPlan.getBitMaps(), + schemaList, + start, + end); } + @Override public void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int end) { - updatePlanIndexes(insertTabletPlan.getIndex()); - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - List<TSEncoding> encodings = new ArrayList<>(); - CompressionType compressionType = null; + List<IMeasurementSchema> schemaList = new ArrayList<>(); for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) { if (insertTabletPlan.getColumns()[i] == null) { continue; } IMeasurementSchema schema = insertTabletPlan.getMeasurementMNodes()[i].getSchema(); - measurements.add(schema.getMeasurementId()); - types.add(schema.getType()); - encodings.add(schema.getEncodingType()); - compressionType = schema.getCompressor(); + schemaList.add(schema); } - if (measurements.isEmpty()) { + if (schemaList.isEmpty()) { return; } - VectorMeasurementSchema vectorSchema = - new VectorMeasurementSchema( - AlignedPath.VECTOR_PLACEHOLDER, - measurements.toArray(new String[measurements.size()]), - types.toArray(new TSDataType[measurements.size()]), - encodings.toArray(new TSEncoding[measurements.size()]), - compressionType); - IWritableMemChunk memSeries = - createAlignedMemChunkIfNotExistAndGet( - insertTabletPlan.getDeviceId().getFullPath(), vectorSchema); - memSeries.writeAlignedValues( + IWritableMemChunkGroup memChunkGroup = + createAlignedMemChunkGroupIfNotExistAndGet( + insertTabletPlan.getDeviceId().getFullPath(), schemaList); + memChunkGroup.writeValues( insertTabletPlan.getTimes(), insertTabletPlan.getColumns(), insertTabletPlan.getBitMaps(), - vectorSchema, + schemaList, start, end); } @Override public boolean checkIfChunkDoesNotExist(String deviceId, String measurement) { - Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId); - if (null == memSeries) { + IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId); + if (null == memChunkGroup) { return true; } - return !memSeries.containsKey(measurement); + return !memChunkGroup.contains(measurement); } @Override - public int getCurrentChunkPointNum(String deviceId, String measurement) { - Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId); - IWritableMemChunk memChunk = memSeries.get(measurement); - return (int) memChunk.count(); + public long getCurrentChunkPointNum(String deviceId, String measurement) { + IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId); + return memChunkGroup.getCurrentChunkPointNum(measurement); } @Override @@ -325,10 +298,8 @@ public abstract class AbstractMemTable implements IMemTable { @Override public long size() { long sum = 0; - for (Map<String, IWritableMemChunk> seriesMap : memTableMap.values()) { - for (IWritableMemChunk writableMemChunk : seriesMap.values()) { - sum += writableMemChunk.count(); - } + for (IWritableMemChunkGroup writableMemChunkGroup : memTableMap.values()) { + sum += writableMemChunkGroup.count(); } return sum; } @@ -373,12 +344,13 @@ public abstract class AbstractMemTable implements IMemTable { @Override public void delete( PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) { - Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath()); - if (deviceMap == null) { + IWritableMemChunkGroup memChunkGroup = memTableMap.get(devicePath.getFullPath()); + if (memChunkGroup == null) { return; } - Iterator<Entry<String, IWritableMemChunk>> iter = deviceMap.entrySet().iterator(); + Iterator<Entry<String, IWritableMemChunk>> iter = + memChunkGroup.getMemChunkMap().entrySet().iterator(); while (iter.hasNext()) { Entry<String, IWritableMemChunk> entry = iter.next(); IWritableMemChunk chunk = entry.getValue(); @@ -433,13 +405,8 @@ public abstract class AbstractMemTable implements IMemTable { @Override public void release() { - for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) { - for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) { - TVList list = subEntry.getValue().getTVList(); - if (list.getReferenceCount() == 0) { - TVListAllocator.getInstance().release(list); - } - } + for (Entry<String, IWritableMemChunkGroup> entry : memTableMap.entrySet()) { + entry.getValue().release(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java index cd7184b..69ebabb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java @@ -23,64 +23,71 @@ import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; -import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class AlignedWritableMemChunk implements IWritableMemChunk { - private final VectorMeasurementSchema schema; + private final Map<String, Integer> measurementIndexMap; + private final List<IMeasurementSchema> schemaList; private AlignedTVList list; private static final String UNSUPPORTED_TYPE = "Unsupported data type:"; private static final Logger LOGGER = LoggerFactory.getLogger(AlignedWritableMemChunk.class); - public AlignedWritableMemChunk(VectorMeasurementSchema schema) { - this.schema = schema; - this.list = TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList()); + public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) { + this.measurementIndexMap = new LinkedHashMap<>(); + List<TSDataType> dataTypeList = new ArrayList<>(); + this.schemaList = schemaList; + for (int i = 0; i < schemaList.size(); i++) { + measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i); + dataTypeList.add(schemaList.get(i).getType()); + } + this.list = TVListAllocator.getInstance().allocate(dataTypeList); } public boolean containsMeasurement(String measurementId) { - return schema.containsSubMeasurement(measurementId); + return measurementIndexMap.containsKey(measurementId); } @Override public void putLong(long t, long v) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putInt(long t, int v) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putFloat(long t, float v) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putDouble(long t, double v) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putBinary(long t, Binary v) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putBoolean(long t, boolean v) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override @@ -90,32 +97,32 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override @@ -126,19 +133,20 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public void write(long insertTime, Object objectValue) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) { - int[] columnIndexArray = checkColumnsInInsertPlan(schema); + public void writeAlignedValue( + long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) { + int[] columnIndexArray = checkColumnsInInsertPlan(schemaList); putAlignedValue(insertTime, objectValue, columnIndexArray); } @Override public void write( long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override @@ -146,32 +154,29 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { long[] times, Object[] valueList, BitMap[] bitMaps, - IMeasurementSchema schema, + List<IMeasurementSchema> schemaList, int start, int end) { - int[] columnIndexArray = checkColumnsInInsertPlan(schema); + int[] columnIndexArray = checkColumnsInInsertPlan(schemaList); putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end); } - private int[] checkColumnsInInsertPlan(IMeasurementSchema schema) { - VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema; - List<String> measurementIdsInInsertPlan = vectorSchema.getSubMeasurementsList(); - List<TSDataType> dataTypesInInsertPlan = vectorSchema.getSubMeasurementsTSDataTypeList(); - List<TSEncoding> encodingsInInsertPlan = vectorSchema.getSubMeasurementsTSEncodingList(); - for (int i = 0; i < measurementIdsInInsertPlan.size(); i++) { - if (!containsMeasurement(measurementIdsInInsertPlan.get(i))) { - this.schema.addMeasurement( - measurementIdsInInsertPlan.get(i), - dataTypesInInsertPlan.get(i), - encodingsInInsertPlan.get(i)); - this.list.extendColumn(dataTypesInInsertPlan.get(i)); + private int[] checkColumnsInInsertPlan(List<IMeasurementSchema> schemaListInInsertPlan) { + List<String> measurementIdsInInsertPlan = new ArrayList<>(); + for (int i = 0; i < schemaListInInsertPlan.size(); i++) { + measurementIdsInInsertPlan.add(schemaListInInsertPlan.get(i).getMeasurementId()); + if (!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) { + this.measurementIndexMap.put( + schemaListInInsertPlan.get(i).getMeasurementId(), measurementIndexMap.size()); + this.schemaList.add(schemaListInInsertPlan.get(i)); + this.list.extendColumn(schemaListInInsertPlan.get(i).getType()); } } - List<String> measurementIdsInTVList = this.schema.getSubMeasurementsList(); - int[] columnIndexArray = new int[measurementIdsInTVList.size()]; - for (int i = 0; i < columnIndexArray.length; i++) { - columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementIdsInTVList.get(i)); - } + int[] columnIndexArray = new int[measurementIndexMap.size()]; + measurementIndexMap.forEach( + (measurementId, i) -> { + columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementId); + }); return columnIndexArray; } @@ -182,7 +187,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public long count() { - return (long) list.size() * schema.getSubMeasurementsCount(); + return (long) list.size() * measurementIndexMap.size(); } public long alignedListSize() { @@ -191,7 +196,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public IMeasurementSchema getSchema() { - return schema; + return null; } @Override @@ -209,7 +214,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { list.increaseReferenceCount(); List<Integer> columnIndexList = new ArrayList<>(); for (IMeasurementSchema measurementSchema : schemaList) { - columnIndexList.add(schema.getSubMeasurementIndex(measurementSchema.getMeasurementId())); + columnIndexList.add(measurementIndexMap.get(measurementSchema.getMeasurementId())); } return list.getTvListByColumnIndex(columnIndexList); } @@ -243,7 +248,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public IChunkWriter createIChunkWriter() { - return new AlignedChunkWriterImpl(schema); + return new AlignedChunkWriterImpl(schemaList); } @Override @@ -310,4 +315,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { timeDuplicateAlignedRowIndexList = null; } } + + @Override + public void release() { + if (list.getReferenceCount() == 0) { + TVListAllocator.getInstance().release(list); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java new file mode 100644 index 0000000..a15fb6f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java @@ -0,0 +1,62 @@ +package org.apache.iotdb.db.engine.memtable; + +import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup { + + private AlignedWritableMemChunk memChunk; + + public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList) { + memChunk = new AlignedWritableMemChunk(schemaList); + } + + @Override + public void writeValues( + long[] times, + Object[] columns, + BitMap[] bitMaps, + List<IMeasurementSchema> schemaList, + int start, + int end) { + memChunk.writeAlignedValues(times, columns, bitMaps, schemaList, start, end); + } + + @Override + public void release() { + memChunk.release(); + } + + @Override + public long count() { + return memChunk.count(); + } + + @Override + public boolean contains(String measurement) { + return memChunk.containsMeasurement(measurement); + } + + @Override + public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) { + memChunk.writeAlignedValue(insertTime, objectValue, schemaList); + } + + @Override + public Map<String, IWritableMemChunk> getMemChunkMap() { + return Collections.singletonMap("", memChunk); + } + + @Override + public long getCurrentChunkPointNum(String measurement) { + return memChunk.count(); + } + + public AlignedWritableMemChunk getAlignedMemChunk() { + return memChunk; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index 1dc2446..e05b1be 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -42,12 +42,13 @@ import java.util.Map; */ public interface IMemTable { - Map<String, Map<String, IWritableMemChunk>> getMemTableMap(); + Map<String, IWritableMemChunkGroup> getMemTableMap(); - void write(String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue); + void write( + String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue); void writeAlignedRow( - String deviceId, IMeasurementSchema schema, long insertTime, Object[] objectValue); + String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue); /** * write data in the range [start, end). Null value in each column values will be replaced by the * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5} @@ -145,7 +146,7 @@ public interface IMemTable { boolean checkIfChunkDoesNotExist(String deviceId, String measurement); /** only used when mem control enabled */ - int getCurrentChunkPointNum(String deviceId, String measurement); + long getCurrentChunkPointNum(String deviceId, String measurement); /** only used when mem control enabled */ void addTextDataSize(long textDataIncrement); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java index ee10ea1..ce8b6f5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java @@ -60,7 +60,8 @@ public interface IWritableMemChunk { void write(long insertTime, Object objectValue); - void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema); + void writeAlignedValue( + long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList); /** * write data in the range [start, end). Null value in the valueList will be replaced by the @@ -73,7 +74,7 @@ public interface IWritableMemChunk { long[] times, Object[] valueList, BitMap[] bitMaps, - IMeasurementSchema schema, + List<IMeasurementSchema> schemaList, int start, int end); @@ -129,4 +130,6 @@ public interface IWritableMemChunk { IChunkWriter createIChunkWriter(); void encode(IChunkWriter chunkWriter); + + void release(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java new file mode 100644 index 0000000..ffbd65c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java @@ -0,0 +1,30 @@ +package org.apache.iotdb.db.engine.memtable; + +import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; + +import java.util.List; +import java.util.Map; + +public interface IWritableMemChunkGroup { + + void writeValues( + long[] times, + Object[] columns, + BitMap[] bitMaps, + List<IMeasurementSchema> schemaList, + int start, + int end); + + void release(); + + long count(); + + boolean contains(String measurement); + + void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList); + + Map<String, IWritableMemChunk> getMemChunkMap(); + + long getCurrentChunkPointNum(String measurement); +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java index 6915967..425fc21 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java @@ -19,9 +19,6 @@ package org.apache.iotdb.db.engine.memtable; -import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; -import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; - import java.util.HashMap; import java.util.Map; @@ -33,23 +30,13 @@ public class PrimitiveMemTable extends AbstractMemTable { this.disableMemControl = !enableMemControl; } - public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) { + public PrimitiveMemTable(Map<String, IWritableMemChunkGroup> memTableMap) { super(memTableMap); } @Override - protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) { - return new WritableMemChunk(schema); - } - - @Override - protected IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema) { - return new AlignedWritableMemChunk((VectorMeasurementSchema) schema); - } - - @Override public IMemTable copy() { - Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap()); + Map<String, IWritableMemChunkGroup> newMap = new HashMap<>(getMemTableMap()); return new PrimitiveMemTable(newMap); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java index 3b096ad..51c7749 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java @@ -72,7 +72,8 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) { + public void writeAlignedValue( + long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType()); } @@ -114,7 +115,7 @@ public class WritableMemChunk implements IWritableMemChunk { long[] times, Object[] valueList, BitMap[] bitMaps, - IMeasurementSchema schema, + List<IMeasurementSchema> schemaList, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType()); @@ -346,4 +347,11 @@ public class WritableMemChunk implements IWritableMemChunk { } } } + + @Override + public void release() { + // if (list.getReferenceCount() == 0) { + // TVListAllocator.getInstance().release(list); + // } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java new file mode 100644 index 0000000..5845da3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java @@ -0,0 +1,99 @@ +package org.apache.iotdb.db.engine.memtable; + +import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class WritableMemChunkGroup implements IWritableMemChunkGroup { + + private Map<String, IWritableMemChunk> memChunkMap; + + public WritableMemChunkGroup(List<IMeasurementSchema> schemaList) { + memChunkMap = new HashMap<>(); + for (IMeasurementSchema schema : schemaList) { + createMemChunkIfNotExistAndGet(schema); + } + } + + @Override + public void writeValues( + long[] times, + Object[] columns, + BitMap[] bitMaps, + List<IMeasurementSchema> schemaList, + int start, + int end) { + int emptyColumnCount = 0; + for (int i = 0; i < columns.length; i++) { + if (columns[i] == null) { + emptyColumnCount++; + continue; + } + IWritableMemChunk memChunk = + createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount)); + memChunk.write( + times, + columns[i], + bitMaps == null ? null : bitMaps[i], + schemaList.get(i - emptyColumnCount).getType(), + start, + end); + } + } + + private IWritableMemChunk createMemChunkIfNotExistAndGet(IMeasurementSchema schema) { + return memChunkMap.computeIfAbsent( + schema.getMeasurementId(), + k -> { + return new WritableMemChunk(schema); + }); + } + + @Override + public void release() { + for (IWritableMemChunk memChunk : memChunkMap.values()) { + memChunk.release(); + } + } + + @Override + public long count() { + long count = 0; + for (IWritableMemChunk memChunk : memChunkMap.values()) { + count += memChunk.count(); + } + return count; + } + + @Override + public boolean contains(String measurement) { + return memChunkMap.containsKey(measurement); + } + + @Override + public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) { + int emptyColumnCount = 0; + for (int i = 0; i < objectValue.length; i++) { + if (objectValue[i] == null) { + emptyColumnCount++; + continue; + } + IWritableMemChunk memChunk = + createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount)); + memChunk.write(insertTime, objectValue[i]); + } + } + + @Override + public Map<String, IWritableMemChunk> getMemChunkMap() { + return memChunkMap; + } + + @Override + public long getCurrentChunkPointNum(String measurement) { + return memChunkMap.get(measurement).count(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 93d020f..5908597 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.flush.MemTableFlushTask; import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable; import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk; +import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; import org.apache.iotdb.db.engine.modification.Deletion; @@ -355,7 +356,7 @@ public class TsFileProcessor { memTableIncrement += TVList.tvListArrayMemSize(insertRowPlan.getDataTypes()[i]); } else { // here currentChunkPointNum >= 1 - int currentChunkPointNum = + long currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, insertRowPlan.getMeasurements()[i]); memTableIncrement += (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0 @@ -378,7 +379,7 @@ public class TsFileProcessor { long memTableIncrement = 0L; long textDataIncrement = 0L; long chunkMetadataIncrement = 0L; - AlignedWritableMemChunk vectorMemChunk = null; + AlignedWritableMemChunk alignedMemChunk = null; String deviceId = insertRowPlan.getDeviceId().getFullPath(); if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) { // ChunkMetadataIncrement @@ -388,15 +389,15 @@ public class TsFileProcessor { memTableIncrement += AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes()); } else { // here currentChunkPointNum >= 1 - int currentChunkPointNum = + long currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER); memTableIncrement += (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0 ? AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes()) : 0; - vectorMemChunk = - ((AlignedWritableMemChunk) - workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER)); + alignedMemChunk = + ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId)) + .getAlignedMemChunk(); } for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) { // skip failed Measurements @@ -404,10 +405,10 @@ public class TsFileProcessor { continue; } // extending the column of aligned mem chunk - if (vectorMemChunk != null - && !vectorMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) { + if (alignedMemChunk != null + && !alignedMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) { memTableIncrement += - (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) + (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) * insertRowPlan.getDataTypes()[i].getDataTypeSize(); } // TEXT data mem size @@ -486,13 +487,13 @@ public class TsFileProcessor { ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) * TVList.tvListArrayMemSize(dataType); } else { - int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, measurement); + long currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, measurement); if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) { memIncrements[0] += ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) * TVList.tvListArrayMemSize(dataType); } else { - int acquireArray = + long acquireArray = (end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE)) / PrimitiveArrayManager.ARRAY_SIZE; memIncrements[0] += @@ -526,7 +527,7 @@ public class TsFileProcessor { * AlignedTVList.alignedTvListArrayMemSize(dataTypes); } else { int currentChunkPointNum = - workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER); + (int) workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER); if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) { memIncrements[0] += ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) @@ -541,8 +542,8 @@ public class TsFileProcessor { : acquireArray * AlignedTVList.alignedTvListArrayMemSize(dataTypes); } vectorMemChunk = - ((AlignedWritableMemChunk) - workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER)); + ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId)) + .getAlignedMemChunk(); } for (int i = 0; i < dataTypes.length; i++) { TSDataType dataType = dataTypes[i]; diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java index ff51b18..f85338d 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java @@ -20,7 +20,8 @@ package org.apache.iotdb.db.metadata.path; import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk; -import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; +import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup; +import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk; @@ -334,14 +335,14 @@ public class AlignedPath extends PartialPath { @Override public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( - Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList) + Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList) throws QueryProcessException, IOException { // check If memtable contains this path if (!memTableMap.containsKey(getDevice())) { return null; } AlignedWritableMemChunk alignedMemChunk = - ((AlignedWritableMemChunk) memTableMap.get(getDevice()).get(VECTOR_PLACEHOLDER)); + ((AlignedWritableMemChunkGroup) memTableMap.get(getDevice())).getAlignedMemChunk(); boolean containsMeasurement = false; for (String measurement : measurementList) { if (alignedMemChunk.containsMeasurement(measurement)) { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java index d8aef64..d6900de 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.metadata.path; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; +import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; @@ -246,14 +247,15 @@ public class MeasurementPath extends PartialPath { @Override public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( - Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList) + Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList) throws QueryProcessException, IOException { // check If Memtable Contains this path if (!memTableMap.containsKey(getDevice()) - || !memTableMap.get(getDevice()).containsKey(getMeasurement())) { + || !memTableMap.get(getDevice()).contains(getMeasurement())) { return null; } - IWritableMemChunk memChunk = memTableMap.get(getDevice()).get(getMeasurement()); + IWritableMemChunk memChunk = + memTableMap.get(getDevice()).getMemChunkMap().get(getMeasurement()); // get sorted tv list is synchronized so different query can get right sorted list reference TVList chunkCopy = memChunk.getSortedTvListForQuery(); int curSize = chunkCopy.size(); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java index 90581d6..ae42486 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.db.metadata.path; -import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; +import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -423,7 +423,7 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable { * @return ReadOnlyMemChunk */ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( - Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList) + Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList) throws QueryProcessException, IOException { throw new UnsupportedOperationException("Should call exact sub class!"); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java index 5fb024b..c29ebc2 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java @@ -241,11 +241,7 @@ public class InsertRowPlan extends InsertPlan { } continue; } - if (isAligned) { - dataTypes[i] = measurementMNodes[i].getSchema().getSubMeasurementsTSDataTypeList().get(i); - } else { - dataTypes[i] = measurementMNodes[i].getSchema().getType(); - } + dataTypes[i] = measurementMNodes[i].getSchema().getType(); try { values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString()); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java index e81a2ab..10617cc 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java @@ -56,10 +56,23 @@ public class MemUtils { } /** + * function for getting the value size. If mem control enabled, do not add text data size here, + * the size will be added to memtable before inserting. + */ + public static long getRecordsSize( + List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) { + long memSize = 0L; + for (int i = 0; i < dataTypes.size(); i++) { + memSize += getRecordSize(dataTypes.get(i), value[i], addingTextDataSize); + } + return memSize; + } + + /** * function for getting the vector value size. If mem control enabled, do not add text data size * here, the size will be added to memtable before inserting. */ - public static long getAlignedRecordSize( + public static long getAlignedRecordsSize( List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) { // time and index size long memSize = 8L + 4L; diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java index 7ded716..836fdfe 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.writelog.recover; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; +import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.engine.memtable.WritableMemChunk; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.ModificationFile; @@ -126,11 +127,11 @@ public class LogReplayer { } } - Map<String, Map<String, IWritableMemChunk>> memTableMap = recoverMemTable.getMemTableMap(); - for (Map.Entry<String, Map<String, IWritableMemChunk>> deviceEntry : memTableMap.entrySet()) { + Map<String, IWritableMemChunkGroup> memTableMap = recoverMemTable.getMemTableMap(); + for (Map.Entry<String, IWritableMemChunkGroup> deviceEntry : memTableMap.entrySet()) { String deviceId = deviceEntry.getKey(); for (Map.Entry<String, IWritableMemChunk> measurementEntry : - deviceEntry.getValue().entrySet()) { + deviceEntry.getValue().getMemChunkMap().entrySet()) { WritableMemChunk memChunk = (WritableMemChunk) measurementEntry.getValue(); currentTsFileResource.updateStartTime(deviceId, memChunk.getFirstPoint()); currentTsFileResource.updateEndTime(deviceId, memChunk.getLastPoint()); @@ -194,10 +195,19 @@ public class LogReplayer { // mark failed plan manually checkDataTypeAndMarkFailed(mNodes, plan); if (plan instanceof InsertRowPlan) { - recoverMemTable.insert((InsertRowPlan) plan); + if (plan.isAligned()) { + recoverMemTable.insertAlignedRow((InsertRowPlan) plan); + } else { + recoverMemTable.insert((InsertRowPlan) plan); + } } else { - recoverMemTable.insertTablet( - (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount()); + if (plan.isAligned()) { + recoverMemTable.insertAlignedTablet( + (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount()); + } else { + recoverMemTable.insertTablet( + (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount()); + } } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java index 09b21c6..00a6fd1 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java @@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.write.schema.Schema; import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class MemTableTestUtils { @@ -62,9 +63,10 @@ public class MemTableTestUtils { for (long l = startTime; l <= endTime; l++) { iMemTable.write( deviceId, - new UnaryMeasurementSchema(measurementId, dataType, TSEncoding.PLAIN), + Collections.singletonList( + new UnaryMeasurementSchema(measurementId, dataType, TSEncoding.PLAIN)), l, - (int) l); + new Object[] {(int) l}); } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java index c11cd90..086a647 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java @@ -22,6 +22,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; +import java.util.Collections; + /** Memtable insert benchmark. Bench the Memtable and get its performance. */ public class MemtableBenchmark { @@ -46,9 +48,10 @@ public class MemtableBenchmark { for (int j = 0; j < numOfMeasurement; j++) { memTable.write( deviceId, - new UnaryMeasurementSchema(measurementId[j], tsDataType, TSEncoding.PLAIN), + Collections.singletonList( + new UnaryMeasurementSchema(measurementId[j], tsDataType, TSEncoding.PLAIN)), System.nanoTime(), - String.valueOf(System.currentTimeMillis())); + new Object[] {String.valueOf(System.currentTimeMillis())}); } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java index a1c925f..6be17a0 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java @@ -119,16 +119,18 @@ public class PrimitiveMemTableTest { for (int i = 0; i < dataSize; i++) { memTable.write( deviceId, - new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN), + Collections.singletonList( + new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN)), dataSize - i - 1, - i + 10); + new Object[] {i + 10}); } for (int i = 0; i < dataSize; i++) { memTable.write( deviceId, - new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN), + Collections.singletonList( + new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN)), i, - i); + new Object[] {i}); } MeasurementPath fullPath = new MeasurementPath( @@ -163,9 +165,9 @@ public class PrimitiveMemTableTest { for (TimeValuePair aRet : ret) { memTable.write( deviceId, - new UnaryMeasurementSchema(sensorId, dataType, encoding), + Collections.singletonList(new UnaryMeasurementSchema(sensorId, dataType, encoding)), aRet.getTimestamp(), - aRet.getValue().getValue()); + new Object[] {aRet.getValue().getValue()}); } MeasurementPath fullPath = new MeasurementPath( diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java index 4c977ca..81999ed 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java @@ -18,10 +18,13 @@ */ package org.apache.iotdb.tsfile.write.chunk; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -63,6 +66,31 @@ public class AlignedChunkWriterImpl implements IChunkWriter { this.valueIndex = 0; } + public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList) { + TSEncoding timeEncoding = + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); + TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); + timeChunkWriter = + new TimeChunkWriter( + "", + schemaList.get(0).getCompressor(), + timeEncoding, + TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType)); + + valueChunkWriterList = new ArrayList<>(schemaList.size()); + for (int i = 0; i < schemaList.size(); i++) { + valueChunkWriterList.add( + new ValueChunkWriter( + schemaList.get(i).getMeasurementId(), + schemaList.get(i).getCompressor(), + schemaList.get(i).getType(), + schemaList.get(i).getEncodingType(), + schemaList.get(i).getValueEncoder())); + } + + this.valueIndex = 0; + } + public void write(long time, int value, boolean isNull) { valueChunkWriterList.get(valueIndex++).write(time, value, isNull); }
