This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2a77b7ab162 Pipe: Optimize the TsFile memory control mechanism of the
disassembly table model (#14890)
2a77b7ab162 is described below
commit 2a77b7ab16261cf56a799e0c18296e62b9e560a2
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Feb 27 19:37:12 2025 +0800
Pipe: Optimize the TsFile memory control mechanism of the disassembly table
model (#14890)
---
.../scan/TsFileInsertionEventScanParser.java | 3 -
.../table/TsFileInsertionEventTableParser.java | 95 +++--
...ileInsertionEventTableParserTabletIterator.java | 421 +++++++++++++++++----
.../resource/memory/InsertNodeMemoryEstimator.java | 2 +-
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 65 ++++
5 files changed, 472 insertions(+), 114 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index e4665c7a62f..54bdbc174a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -53,7 +53,6 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.File;
import java.io.IOException;
-import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -64,8 +63,6 @@ import java.util.Objects;
public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser
{
- private static final LocalDate EMPTY_DATE = LocalDate.of(1000, 1, 1);
-
private final long startTime;
private final long endTime;
private final Filter filter;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 00e3cef54cb..ce59156b508 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -24,28 +24,30 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.controller.CachedChunkLoaderImpl;
-import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.tsfile.read.query.executor.TableQueryExecutor;
import org.apache.tsfile.write.record.Tablet;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
-import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser {
- private final TableQueryExecutor tableQueryExecutor;
+ private final long startTime;
+ private final long endTime;
+ private final TablePattern tablePattern;
- private final Iterator<Map.Entry<String, TableSchema>>
filteredTableSchemaIterator;
+ private final PipeMemoryBlock allocatedMemoryBlockForBatchData;
+ private final PipeMemoryBlock allocatedMemoryBlockForChunk;
+ private final PipeMemoryBlock allocatedMemoryBlockForChunkMeta;
+ private final PipeMemoryBlock allocatedMemoryBlockForTableSchemas;
public TsFileInsertionEventTableParser(
final File tsFile,
@@ -58,16 +60,20 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
super(null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
try {
+ this.allocatedMemoryBlockForChunk =
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+ this.allocatedMemoryBlockForBatchData =
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+ this.allocatedMemoryBlockForChunkMeta =
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+ this.allocatedMemoryBlockForTableSchemas =
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.tablePattern = pattern;
+
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true,
true);
- filteredTableSchemaIterator =
- tsFileSequenceReader.getTableSchemaMap().entrySet().stream()
- .filter(entry -> Objects.isNull(pattern) ||
pattern.matchesTable(entry.getKey()))
- .iterator();
- tableQueryExecutor =
- new TableQueryExecutor(
- new MetadataQuerierByFileImpl(tsFileSequenceReader),
- new CachedChunkLoaderImpl(tsFileSequenceReader),
- TableQueryExecutor.TableQueryOrdering.DEVICE);
} catch (final Exception e) {
close();
throw e;
@@ -79,29 +85,35 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
return () ->
new Iterator<TabletInsertionEvent>() {
- private TsFileInsertionEventTableParserTabletIterator tabletIterator
= null;
+ private TsFileInsertionEventTableParserTabletIterator tabletIterator;
@Override
public boolean hasNext() {
- while (tabletIterator == null || !tabletIterator.hasNext()) {
- if (!filteredTableSchemaIterator.hasNext()) {
- close();
- return false;
- }
-
- final Map.Entry<String, TableSchema> entry =
filteredTableSchemaIterator.next();
-
- try {
+ try {
+ if (tabletIterator == null) {
tabletIterator =
new TsFileInsertionEventTableParserTabletIterator(
- tableQueryExecutor, entry.getKey(), entry.getValue(),
startTime, endTime);
- } catch (final Exception e) {
+ tsFileSequenceReader,
+ entry ->
+ Objects.isNull(tablePattern)
+ || tablePattern.matchesTable(entry.getKey()),
+ allocatedMemoryBlockForTablet,
+ allocatedMemoryBlockForBatchData,
+ allocatedMemoryBlockForChunk,
+ allocatedMemoryBlockForChunkMeta,
+ allocatedMemoryBlockForTableSchemas,
+ startTime,
+ endTime);
+ }
+ if (!tabletIterator.hasNext()) {
close();
- throw new PipeException("failed to create
TsFileInsertionDataTabletIterator", e);
+ return false;
}
+ return true;
+ } catch (Exception e) {
+ close();
+ throw new PipeException("Error while parsing tsfile insertion
event", e);
}
-
- return true;
}
@Override
@@ -174,4 +186,25 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
}
};
}
+
+ @Override
+ public void close() {
+ super.close();
+
+ if (allocatedMemoryBlockForBatchData != null) {
+ allocatedMemoryBlockForBatchData.close();
+ }
+
+ if (allocatedMemoryBlockForChunk != null) {
+ allocatedMemoryBlockForChunk.close();
+ }
+
+ if (allocatedMemoryBlockForChunkMeta != null) {
+ allocatedMemoryBlockForChunkMeta.close();
+ }
+
+ if (allocatedMemoryBlockForTableSchemas != null) {
+ allocatedMemoryBlockForTableSchemas.close();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 8aa47cfa595..e594b58e86b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -19,130 +19,393 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.read.ReadProcessException;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.TableSchema;
-import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.query.executor.TableQueryExecutor;
-import org.apache.tsfile.read.reader.block.TsBlockReader;
+import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.tsfile.read.reader.IChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.apache.tsfile.enums.TSDataType.DATE;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
public class TsFileInsertionEventTableParserTabletIterator implements
Iterator<Tablet> {
- private final String tableName;
-
private final long startTime;
private final long endTime;
- private final List<IMeasurementSchema> columnSchemas;
- private final List<Tablet.ColumnCategory> columnTypes;
- private final List<String> columnNames;
- private final TsBlockReader tsBlockReader;
+ // Used to read or record TSFileMeta tools or meta information
+ private final TsFileSequenceReader reader;
+ private final IMetadataQuerier metadataQuerier;
+ private final TsFileMetadata fileMetadata;
+ private final Iterator<Map.Entry<String, TableSchema>>
filteredTableSchemaIterator;
+
+ // For memory control
+ private final PipeMemoryBlock allocatedMemoryBlockForTablet;
+ private final PipeMemoryBlock allocatedMemoryBlockForBatchData;
+ private final PipeMemoryBlock allocatedMemoryBlockForChunk;
+ private final PipeMemoryBlock allocatedMemoryBlockForChunkMeta;
+ private final PipeMemoryBlock allocatedMemoryBlockForTableSchema;
+
+ // Used to read tsfile data
+ private IChunkReader chunkReader;
+ private BatchData batchData;
+
+ // Record the metadata information of the currently read Table
+ private Set<String> measurementNames;
+ private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
+ private Iterator<List<IChunkMetadata>> chunkMetadataList;
+
+ // Record the information of the currently read Table
+ private String tableName;
+ private IDeviceID deviceID;
+ private List<Tablet.ColumnCategory> columnTypes;
+ private List<String> measurementList;
+ private List<TSDataType> dataTypeList;
+
+ private List<Pair<String, Integer>> measurementColumIndexList;
+ private List<Integer> measurementIdIndexList;
+
+ // Used to record whether the same Tablet is generated when parsing starts.
Different table
+ // information cannot be placed in the same Tablet.
+ private boolean isSameTableName;
public TsFileInsertionEventTableParserTabletIterator(
- final TableQueryExecutor tableQueryExecutor,
- final String tableName,
- final TableSchema tableSchema,
+ final TsFileSequenceReader tsFileSequenceReader,
+ final Predicate<Map.Entry<String, TableSchema>> predicate,
+ final PipeMemoryBlock allocatedMemoryBlockForTablet,
+ final PipeMemoryBlock allocatedMemoryBlockForBatchData,
+ final PipeMemoryBlock allocatedMemoryBlockForChunk,
+ final PipeMemoryBlock allocatedMemoryBlockForChunkMeta,
+ final PipeMemoryBlock allocatedMemoryBlockForTableSchema,
final long startTime,
- final long endTime) {
- this.tableName = tableName;
+ final long endTime)
+ throws IOException {
+
this.startTime = startTime;
this.endTime = endTime;
- columnSchemas = new ArrayList<>();
- columnTypes = new ArrayList<>();
- columnNames = new ArrayList<>();
- try {
- for (int i = 0, size = tableSchema.getColumnSchemas().size(); i < size;
i++) {
- final IMeasurementSchema schema =
tableSchema.getColumnSchemas().get(i);
- if (schema.getMeasurementName() != null &&
!schema.getMeasurementName().isEmpty()) {
- columnSchemas.add(schema);
- columnTypes.add(tableSchema.getColumnTypes().get(i));
- columnNames.add(schema.getMeasurementName());
- }
- }
+ this.reader = tsFileSequenceReader;
+ this.metadataQuerier = new MetadataQuerierByFileImpl(reader);
+ fileMetadata = this.metadataQuerier.getWholeFileMetadata();
+ final List<Map.Entry<String, TableSchema>> tableSchemaList =
+ fileMetadata.getTableSchemaMap().entrySet().stream()
+ .filter(predicate)
+ .collect(Collectors.toList());
- tsBlockReader = tableQueryExecutor.query(tableName, columnNames, null,
null, null);
- } catch (final ReadProcessException e) {
- throw new PipeException("Failed to build query data set", e);
+ this.allocatedMemoryBlockForTablet = allocatedMemoryBlockForTablet;
+ this.allocatedMemoryBlockForBatchData = allocatedMemoryBlockForBatchData;
+ this.allocatedMemoryBlockForChunk = allocatedMemoryBlockForChunk;
+ this.allocatedMemoryBlockForChunkMeta = allocatedMemoryBlockForChunkMeta;
+ this.allocatedMemoryBlockForTableSchema =
allocatedMemoryBlockForTableSchema;
+
+ long tableSchemaSize =
fileMetadata.getBloomFilter().getRetainedSizeInBytes();
+ for (Map.Entry<String, TableSchema> tableSchemaEntry : tableSchemaList) {
+ tableSchemaSize +=
+ tableSchemaEntry.getKey().length()
+ +
PipeMemoryWeightUtil.calculateTableSchemaBytesUsed(tableSchemaEntry.getValue());
+ PipeDataNodeResourceManager.memory()
+ .forceResize(this.allocatedMemoryBlockForTableSchema,
tableSchemaSize);
}
+
+ filteredTableSchemaIterator = tableSchemaList.iterator();
}
@Override
public boolean hasNext() {
- return tsBlockReader.hasNext();
+ try {
+ State state = State.CHECK_DATA;
+ while (true) {
+ switch (state) {
+ case CHECK_DATA:
+ if (batchData != null && batchData.hasCurrent()) {
+ return true;
+ }
+ case INIT_DATA:
+ if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
+ batchData = chunkReader.nextPageData();
+ PipeDataNodeResourceManager.memory()
+ .forceResize(
+ allocatedMemoryBlockForBatchData,
+
PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(batchData));
+ state = State.CHECK_DATA;
+ break;
+ }
+ case INIT_CHUNK_READER:
+ if (chunkMetadataList != null && chunkMetadataList.hasNext()) {
+ initChunkReader((AbstractAlignedChunkMetadata)
chunkMetadataList.next().get(0));
+ state = State.INIT_DATA;
+ break;
+ }
+ case INIT_CHUNK_METADATA:
+ if (deviceMetaIterator != null && deviceMetaIterator.hasNext()) {
+ final Pair<IDeviceID, MetadataIndexNode> pair =
deviceMetaIterator.next();
+
+ long size = 0;
+ List<List<IChunkMetadata>> iChunkMetadataList =
+ reader.getIChunkMetadataList(pair.left, measurementNames,
pair.right);
+
+ Iterator<List<IChunkMetadata>> chunkMetadataIterator =
iChunkMetadataList.iterator();
+ while (chunkMetadataIterator.hasNext()) {
+ final List<IChunkMetadata> chunkMetadata =
chunkMetadataIterator.next();
+ if (chunkMetadata == null
+ || chunkMetadata.isEmpty()
+ || !(chunkMetadata.get(0) instanceof
AbstractAlignedChunkMetadata)) {
+ throw new PipeException(
+ "Table model tsfile parsing does not support this type
of ChunkMeta");
+ }
+
+ final AbstractAlignedChunkMetadata alignedChunkMetadata =
+ (AbstractAlignedChunkMetadata) chunkMetadata.get(0);
+
+ // Reduce the number of times Chunks are read
+ if (alignedChunkMetadata.getEndTime() < startTime
+ || alignedChunkMetadata.getStartTime() > endTime) {
+ chunkMetadataIterator.remove();
+ continue;
+ }
+
+ size +=
+
PipeMemoryWeightUtil.calculateAlignedChunkMetaBytesUsed(alignedChunkMetadata);
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForChunkMeta, size);
+ }
+
+ deviceID = pair.getLeft();
+ chunkMetadataList = iChunkMetadataList.iterator();
+
+ state = State.INIT_CHUNK_READER;
+ break;
+ }
+ case INIT_DEVICE_META:
+ if (filteredTableSchemaIterator != null &&
filteredTableSchemaIterator.hasNext()) {
+ final Map.Entry<String, TableSchema> entry =
filteredTableSchemaIterator.next();
+ tableName = entry.getKey();
+ final TableSchema tableSchema = entry.getValue();
+ // The table name has changed, set to false
+ isSameTableName = false;
+
+ final MetadataIndexNode tableRoot =
fileMetadata.getTableMetadataIndexNode(tableName);
+ deviceMetaIterator = metadataQuerier.deviceIterator(tableRoot,
null);
+
+ final int columnSchemaSize =
tableSchema.getColumnSchemas().size();
+ dataTypeList = new ArrayList<>(columnSchemaSize);
+ columnTypes = new ArrayList<>(columnSchemaSize);
+ measurementList = new ArrayList<>(columnSchemaSize);
+ measurementNames = new HashSet<>();
+
+ measurementColumIndexList = new ArrayList<>(columnSchemaSize);
+ measurementIdIndexList = new ArrayList<>(columnSchemaSize);
+
+ for (int i = 0, j = 0; i < columnSchemaSize; i++) {
+ final IMeasurementSchema schema =
tableSchema.getColumnSchemas().get(i);
+ final Tablet.ColumnCategory columnCategory =
tableSchema.getColumnTypes().get(i);
+ if (schema != null
+ && schema.getMeasurementName() != null
+ && !schema.getMeasurementName().isEmpty()) {
+ final String measurementName = schema.getMeasurementName();
+ columnTypes.add(columnCategory);
+ measurementList.add(measurementName);
+ dataTypeList.add(schema.getType());
+ if (!Tablet.ColumnCategory.TAG.equals(columnCategory)) {
+ measurementNames.add(measurementName);
+ measurementColumIndexList.add(new Pair<>(measurementName,
j));
+ } else {
+ measurementIdIndexList.add(j);
+ }
+ j++;
+ }
+ }
+ state = State.INIT_CHUNK_METADATA;
+ break;
+ }
+ return false;
+ }
+ }
+ } catch (Exception e) {
+ throw new PipeException(e.getMessage(), e);
+ }
+ }
+
+ private enum State {
+ CHECK_DATA,
+ INIT_DATA,
+ INIT_CHUNK_READER,
+ INIT_CHUNK_METADATA,
+ INIT_DEVICE_META
}
@Override
public Tablet next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ return buildNextTablet();
+ }
+
+ private Tablet buildNextTablet() {
+ Tablet tablet = null;
+
+ boolean isFirstRow = true;
+ while (hasNext() && (isFirstRow || isSameTableName)) {
+ if (batchData.currentTime() >= startTime && batchData.currentTime() <=
endTime) {
+ if (isFirstRow) {
+ // Record the name of the table when the tablet is started.
Different table data cannot be
+ // in the same tablet.
+ isSameTableName = true;
+
+ // Calculate row count and memory size of the tablet based on the
first row
+ final Pair<Integer, Integer> rowCountAndMemorySize =
+ PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData);
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForTablet,
rowCountAndMemorySize.getLeft());
+
+ tablet =
+ new Tablet(
+ tableName,
+ measurementList,
+ dataTypeList,
+ columnTypes,
+ rowCountAndMemorySize.getLeft());
+ tablet.initBitMaps();
+ isFirstRow = false;
+ }
+ final int rowIndex = tablet.getRowSize();
+ if (rowIndex >= tablet.getMaxRowNumber()) {
+ break;
+ }
+
+ tablet.addTimestamp(rowIndex, batchData.currentTime());
+ fillMeasurementValueColumns(batchData, tablet, rowIndex);
+ fillDeviceIdColumns(deviceID, tablet, rowIndex);
+ }
+
+ if (batchData != null) {
+ batchData.next();
+ }
}
- try {
- return buildNextTablet();
- } catch (final IOException e) {
- throw new PipeException("Failed to build tablet", e);
+ if (isFirstRow) {
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunkMeta,
0);
+ tablet = new Tablet(tableName, measurementList, dataTypeList,
columnTypes, 0);
+ tablet.initBitMaps();
}
+
+ return tablet;
}
- // TODO: memory control
- private Tablet buildNextTablet() throws IOException {
- final TsBlock tsBlock = tsBlockReader.next();
-
- List<String> measurementList = new ArrayList<>(columnSchemas.size());
- List<TSDataType> dataTypeList = new ArrayList<>(columnSchemas.size());
- columnSchemas.forEach(
- columnSchema -> {
- measurementList.add(columnSchema.getMeasurementName());
- dataTypeList.add(columnSchema.getType());
- });
- final Tablet tablet =
- new Tablet(
- tableName, measurementList, dataTypeList, columnTypes,
tsBlock.getPositionCount());
- tablet.initBitMaps();
-
- boolean isAllNull = true;
-
- final TsBlock.TsBlockRowIterator rowIterator =
tsBlock.getTsBlockRowIterator();
- while (rowIterator.hasNext()) {
- final Object[] row = rowIterator.next();
-
- final long timestamp = (Long) row[row.length - 1];
- if (timestamp < startTime || timestamp > endTime) {
- continue;
+ private void initChunkReader(AbstractAlignedChunkMetadata
alignedChunkMetadata)
+ throws IOException {
+ final Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ long size = PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
size);
+
+ final List<Chunk> valueChunkList =
+ new
ArrayList<>(alignedChunkMetadata.getValueChunkMetadataList().size());
+ final Map<String, ChunkMetadata> metadataMap = new HashMap<>();
+ for (IChunkMetadata metadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
+ if (metadata != null) {
+ metadataMap.put(metadata.getMeasurementUid(), (ChunkMetadata)
metadata);
}
+ }
- final int rowIndex = tablet.getRowSize();
- tablet.addTimestamp(rowIndex, timestamp);
- for (int i = 0, fieldSize = row.length - 1; i < fieldSize; i++) {
- final Object value =
- columnSchemas.get(i).getType() != DATE || row[i] == null
- ? row[i]
- : DateUtils.parseIntToLocalDate((Integer) row[i]);
- tablet.addValue(columnNames.get(i), rowIndex, value);
- if (value != null && columnTypes.get(i) ==
Tablet.ColumnCategory.FIELD) {
- isAllNull = false;
- }
+ // The metadata obtained by alignedChunkMetadata.getValueChunkMetadataList
may not be continuous
+ // when reading TSFile Chunks, so reordering the metadata here has little
effect on the
+ // efficiency of reading chunks.
+ for (Pair<String, Integer> m : measurementColumIndexList) {
+ final ChunkMetadata metadata = metadataMap.get(m.getLeft());
+ if (metadata != null) {
+ final Chunk chunk = reader.readMemChunk(metadata);
+
+ size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
size);
+
+ valueChunkList.add(chunk);
+ continue;
}
+ valueChunkList.add(null);
}
- if (isAllNull) {
- tablet.setRowSize(0);
+ this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
+ }
+
+ private void fillMeasurementValueColumns(
+ final BatchData data, final Tablet tablet, final int rowIndex) {
+ final TsPrimitiveType[] primitiveTypes = data.getVector();
+ final List<IMeasurementSchema> measurementSchemas = tablet.getSchemas();
+
+ for (int i = 0, size = measurementColumIndexList.size(); i < size; i++) {
+ final TsPrimitiveType primitiveType = primitiveTypes[i];
+ if (primitiveType == null) {
+ continue;
+ }
+
+ final int index = measurementColumIndexList.get(i).getRight();
+ switch (measurementSchemas.get(index).getType()) {
+ case BOOLEAN:
+ tablet.addValue(rowIndex, index, primitiveType.getBoolean());
+ break;
+ case INT32:
+ tablet.addValue(rowIndex, index, primitiveType.getInt());
+ break;
+ case DATE:
+ tablet.addValue(rowIndex, index,
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
+ break;
+ case INT64:
+ case TIMESTAMP:
+ tablet.addValue(rowIndex, index, primitiveType.getLong());
+ break;
+ case FLOAT:
+ tablet.addValue(rowIndex, index, primitiveType.getFloat());
+ break;
+ case DOUBLE:
+ tablet.addValue(rowIndex, index, primitiveType.getDouble());
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ tablet.addValue(rowIndex, index,
primitiveType.getBinary().getValues());
+ break;
+ default:
+ throw new UnSupportedDataTypeException("UnSupported" +
primitiveType.getDataType());
+ }
}
+ }
- return tablet;
+ private void fillDeviceIdColumns(
+ final IDeviceID deviceID, final Tablet tablet, final int rowIndex) {
+ final String[] deviceIdSegments = (String[]) deviceID.getSegments();
+ for (int i = 1, totalColumns = deviceIdSegments.length; i < totalColumns;
i++) {
+ if (deviceIdSegments[i] == null) {
+ continue;
+ }
+ tablet.addValue(rowIndex, measurementIdIndexList.get(i - 1),
deviceIdSegments[i]);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
index 46c0ff4e121..3d87d8eb7b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
@@ -476,7 +476,7 @@ public class InsertNodeMemoryEstimator {
return size;
}
- private static long sizeOfMeasurementSchema(final MeasurementSchema
measurementSchema) {
+ public static long sizeOfMeasurementSchema(final MeasurementSchema
measurementSchema) {
if (measurementSchema == null) {
return 0L;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index c016c703f8a..5f28fedf14a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -24,8 +24,13 @@ import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
@@ -34,10 +39,15 @@ import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.List;
import java.util.Map;
+import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
+import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+import static org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize;
+
public class PipeMemoryWeightUtil {
/** Estimates memory usage of a {@link Map}<{@link IDeviceID}, {@link
Boolean}>. */
@@ -252,6 +262,34 @@ public class PipeMemoryWeightUtil {
return totalSizeInBytes;
}
+ public static long calculateTableSchemaBytesUsed(TableSchema tableSchema) {
+ long totalSizeInBytes = 0;
+
+ final String tableName = tableSchema.getTableName();
+ if (tableName != null) {
+ totalSizeInBytes += tableName.length();
+ }
+
+ final List<IMeasurementSchema> measurementSchemas =
tableSchema.getColumnSchemas();
+ if (measurementSchemas != null) {
+ totalSizeInBytes +=
+ NUM_BYTES_ARRAY_HEADER + (long) NUM_BYTES_OBJECT_REF *
measurementSchemas.size();
+ for (IMeasurementSchema measurementSchema : measurementSchemas) {
+ InsertNodeMemoryEstimator.sizeOfMeasurementSchema((MeasurementSchema)
measurementSchema);
+ }
+ }
+
+ final List<Tablet.ColumnCategory> categories =
tableSchema.getColumnTypes();
+ if (categories != null) {
+ totalSizeInBytes +=
+ alignObjectSize(
+ (long) NUM_BYTES_ARRAY_HEADER
+ + (long) NUM_BYTES_OBJECT_REF * (long) categories.size());
+ }
+
+ return totalSizeInBytes;
+ }
+
public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
int totalSizeInBytes = 0;
@@ -285,6 +323,33 @@ public class PipeMemoryWeightUtil {
return batchData.length() * totalSizeInBytes;
}
+ public static long calculateChunkRamBytesUsed(Chunk chunk) {
+ return chunk != null ? chunk.getRetainedSizeInBytes() : 0L;
+ }
+
+ public static long calculateAlignedChunkMetaBytesUsed(
+ AbstractAlignedChunkMetadata alignedChunkMetadata) {
+ if (alignedChunkMetadata == null) {
+ return 0L;
+ }
+
+ final ChunkMetadata timeChunkMetadata =
+ (ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata();
+ final List<IChunkMetadata> valueChunkMetadataList =
+ alignedChunkMetadata.getValueChunkMetadataList();
+
+ long size = timeChunkMetadata != null ?
timeChunkMetadata.getRetainedSizeInBytes() : 0;
+ if (valueChunkMetadataList != null && !valueChunkMetadataList.isEmpty()) {
+ for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) {
+ if (valueChunkMetadata != null) {
+ size += ((ChunkMetadata)
valueChunkMetadata).getRetainedSizeInBytes();
+ }
+ }
+ }
+
+ return size;
+ }
+
/**
* Rounds up the given integer num to the nearest multiple of n.
*