This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 08fc8abe247 Pipe: better memory control for in-memory tablets (#13301)
08fc8abe247 is described below
commit 08fc8abe247d16bd89a5911081720b8a59e19517
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Sep 2 18:36:56 2024 +0800
Pipe: better memory control for in-memory tablets (#13301)
---
.../iotdb/db/pipe/event/common/row/PipeRow.java | 15 +++
.../db/pipe/event/common/row/PipeRowCollector.java | 14 +--
.../common/tablet/PipeRawTabletInsertionEvent.java | 6 +-
.../container/TsFileInsertionDataContainer.java | 12 ++
.../query/TsFileInsertionQueryDataContainer.java | 3 +-
.../TsFileInsertionQueryDataTabletIterator.java | 46 ++++++--
.../scan/TsFileInsertionScanDataContainer.java | 43 +++++--
.../db/pipe/resource/memory/PipeMemoryManager.java | 59 +++++++++-
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 128 +++++++++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 9 ++
.../iotdb/commons/conf/CommonDescriptor.java | 5 +
.../iotdb/commons/pipe/config/PipeConfig.java | 5 +
12 files changed, 314 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 8757b6613c9..6d4d25b9542 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -190,6 +190,21 @@ public class PipeRow implements Row {
return isAligned;
}
+ public int getCurrentRowSize() {
+ int rowSize = 0;
+ rowSize += 8; // timestamp
+ for (int i = 0; i < valueColumnTypes.length; i++) {
+ if (valueColumnTypes[i] != null) {
+ if (valueColumnTypes[i].isBinary()) {
+ rowSize += getBinary(i) != null ? getBinary(i).getLength() : 0;
+ } else {
+ rowSize += valueColumnTypes[i].getDataTypeSize();
+ }
+ }
+ }
+ return rowSize;
+ }
+
public IMeasurementSchema[] getMeasurementSchemaList() {
return measurementSchemaList;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index c1f7739534d..4646041a8a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -19,15 +19,16 @@
package org.apache.iotdb.db.pipe.event.common.row;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -66,13 +67,12 @@ public class PipeRowCollector implements RowCollector {
final String deviceId = pipeRow.getDeviceId();
final List<IMeasurementSchema> measurementSchemaList =
new ArrayList<>(Arrays.asList(measurementSchemaArray));
- tablet =
- new Tablet(
- deviceId,
- measurementSchemaList,
- PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
- isAligned = pipeRow.isAligned();
+ // Calculate row count and memory size of the tablet based on the first
row
+ Pair<Integer, Integer> rowCountAndMemorySize =
+ PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow);
+ tablet = new Tablet(deviceId, measurementSchemaList,
rowCountAndMemorySize.getLeft());
tablet.initBitMaps();
+ isAligned = pipeRow.isAligned();
}
final int rowIndex = tablet.rowSize;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 285121ae63c..f06819323ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -110,7 +111,10 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
- allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet);
+ allocatedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForTabletWithRetry(
+ PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index 2e8a7ec6efa..d41ed8b0868 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -22,6 +22,8 @@ package
org.apache.iotdb.db.pipe.event.common.tsfile.container;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -42,6 +44,8 @@ public abstract class TsFileInsertionDataContainer implements
AutoCloseable {
protected final PipeTaskMeta pipeTaskMeta; // used to report progress
protected final EnrichedEvent sourceEvent; // used to report progress
+ protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
+
protected TsFileSequenceReader tsFileSequenceReader;
protected TsFileInsertionDataContainer(
@@ -58,6 +62,10 @@ public abstract class TsFileInsertionDataContainer
implements AutoCloseable {
this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
+
+ // Allocate empty memory block, will be resized later.
+ this.allocatedMemoryBlockForTablet =
+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
}
/**
@@ -74,5 +82,9 @@ public abstract class TsFileInsertionDataContainer implements
AutoCloseable {
} catch (final IOException e) {
LOGGER.warn("Failed to close TsFileSequenceReader", e);
}
+
+ if (allocatedMemoryBlockForTablet != null) {
+ allocatedMemoryBlockForTablet.close();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index c62d5bc603a..955a7614851 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -280,7 +280,8 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
measurementDataTypeMap,
entry.getKey(),
entry.getValue(),
- timeFilterExpression);
+ timeFilterExpression,
+ allocatedMemoryBlockForTablet);
} catch (final Exception e) {
close();
throw new PipeException("failed to create
TsFileInsertionDataTabletIterator", e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
index e6bcdca94b2..d644d15297a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.common.constant.TsFileConstant;
@@ -32,6 +34,7 @@ import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.expression.IExpression;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -42,6 +45,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.stream.Collectors;
public class TsFileInsertionQueryDataTabletIterator implements
Iterator<Tablet> {
@@ -56,12 +60,15 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
private final QueryDataSet queryDataSet;
+ private final PipeMemoryBlock allocatedBlockForTablet;
+
TsFileInsertionQueryDataTabletIterator(
final TsFileReader tsFileReader,
final Map<String, TSDataType> measurementDataTypeMap,
final IDeviceID deviceId,
final List<String> measurements,
- final IExpression timeFilterExpression)
+ final IExpression timeFilterExpression,
+ final PipeMemoryBlock allocatedBlockForTablet)
throws IOException {
this.tsFileReader = tsFileReader;
this.measurementDataTypeMap = measurementDataTypeMap;
@@ -79,6 +86,8 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
this.timeFilterExpression = timeFilterExpression;
this.queryDataSet = buildQueryDataSet();
+
+ this.allocatedBlockForTablet =
Objects.requireNonNull(allocatedBlockForTablet);
}
private QueryDataSet buildQueryDataSet() throws IOException {
@@ -118,16 +127,35 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR
+ measurement);
schemas.add(new MeasurementSchema(measurement, dataType));
}
- final Tablet tablet =
- new Tablet(
- // Used for tree model
- deviceId.toString(),
- schemas,
- PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
- tablet.initBitMaps();
+ Tablet tablet = null;
+ if (!queryDataSet.hasNext()) {
+ tablet =
+ new Tablet(
+ // Used for tree model
+ deviceId.toString(), schemas, 1);
+ tablet.initBitMaps();
+ // Ignore the memory cost of tablet
+
PipeDataNodeResourceManager.memory().forceResize(allocatedBlockForTablet, 0);
+ return tablet;
+ }
+
+ boolean isFirstRow = true;
while (queryDataSet.hasNext()) {
final RowRecord rowRecord = queryDataSet.next();
+ if (isFirstRow) {
+ // Calculate row count and memory size of the tablet based on the
first row
+ Pair<Integer, Integer> rowCountAndMemorySize =
+ PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(rowRecord);
+ tablet =
+ new Tablet(
+ // Used for tree model
+ deviceId.toString(), schemas, rowCountAndMemorySize.getLeft());
+ tablet.initBitMaps();
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedBlockForTablet,
rowCountAndMemorySize.getRight());
+ isFirstRow = false;
+ }
final int rowIndex = tablet.rowSize;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 8dc4e6a07e7..3151c951229 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -19,12 +19,13 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -44,6 +45,7 @@ import org.apache.tsfile.read.reader.chunk.ChunkReader;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
@@ -175,15 +177,32 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private Tablet getNextTablet() {
try {
- final Tablet tablet =
- new Tablet(
- currentDevice.toString(),
- currentMeasurements,
- PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
- tablet.initBitMaps();
+ Tablet tablet = null;
+ if (!data.hasCurrent()) {
+ tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
+ tablet.initBitMaps();
+ // Ignore the memory cost of tablet
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet,
0);
+ return tablet;
+ }
+
+ boolean isFirstRow = true;
while (data.hasCurrent()) {
if (isMultiPage || data.currentTime() >= startTime &&
data.currentTime() <= endTime) {
+ if (isFirstRow) {
+ // Calculate row count and memory size of the tablet based on the
first row
+ Pair<Integer, Integer> rowCountAndMemorySize =
+ PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(data);
+ tablet =
+ new Tablet(
+ currentDevice.toString(), currentMeasurements,
rowCountAndMemorySize.getLeft());
+ tablet.initBitMaps();
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForTablet,
rowCountAndMemorySize.getRight());
+ isFirstRow = false;
+ }
+
final int rowIndex = tablet.rowSize;
tablet.addTimestamp(rowIndex, data.currentTime());
@@ -197,16 +216,22 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
data = chunkReader.nextPageData();
}
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ if (tablet != null && tablet.rowSize == tablet.getMaxRowNumber()) {
break;
}
}
+ if (tablet == null) {
+ tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
+ tablet.initBitMaps();
+ // Ignore the memory cost of tablet
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet,
0);
+ }
+
// Switch chunk reader iff current chunk is all consumed
if (!data.hasCurrent()) {
prepareData();
}
-
return tablet;
} catch (final Exception e) {
close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index bf5db9d0bb2..4763fcc71b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +74,7 @@ public class PipeMemoryManager {
return forceAllocate(sizeInBytes, false);
}
- public PipeTabletMemoryBlock forceAllocateWithRetry(Tablet tablet)
+ public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long
tabletSizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
// No need to calculate the tablet size, skip it to save time
@@ -107,8 +106,7 @@ public class PipeMemoryManager {
synchronized (this) {
final PipeTabletMemoryBlock block =
- (PipeTabletMemoryBlock)
-
forceAllocate(PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet), true);
+ (PipeTabletMemoryBlock) forceAllocate(tabletSizeInBytes, true);
usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
return block;
}
@@ -147,6 +145,59 @@ public class PipeMemoryManager {
sizeInBytes));
}
+ public synchronized void forceResize(PipeMemoryBlock block, long targetSize)
{
+ if (block == null || block.isReleased()) {
+ LOGGER.warn("forceResize: cannot resize a null or released memory
block");
+ return;
+ }
+
+ if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+ block.setMemoryUsageInBytes(targetSize);
+ return;
+ }
+
+ final long oldSize = block.getMemoryUsageInBytes();
+
+ if (oldSize >= targetSize) {
+ usedMemorySizeInBytes -= oldSize - targetSize;
+ if (block instanceof PipeTabletMemoryBlock) {
+ usedMemorySizeInBytesOfTablets -= oldSize - targetSize;
+ }
+ block.setMemoryUsageInBytes(targetSize);
+ return;
+ }
+
+ long sizeInBytes = targetSize - oldSize;
+ for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
+ if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
+ usedMemorySizeInBytes += sizeInBytes;
+ if (block instanceof PipeTabletMemoryBlock) {
+ usedMemorySizeInBytesOfTablets += sizeInBytes;
+ }
+ block.setMemoryUsageInBytes(targetSize);
+ return;
+ }
+
+ try {
+ tryShrink4Allocate(sizeInBytes);
+ this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("forceResize: interrupted while waiting for available
memory", e);
+ }
+ }
+
+ throw new PipeRuntimeOutOfMemoryCriticalException(
+ String.format(
+ "forceResize: failed to allocate memory after %d retries, "
+ + "total memory size %d bytes, used memory size %d bytes, "
+ + "requested memory size %d bytes",
+ MEMORY_ALLOCATE_MAX_RETRIES,
+ TOTAL_MEMORY_SIZE_IN_BYTES,
+ usedMemorySizeInBytes,
+ sizeInBytes));
+ }
+
/**
* Allocate a {@link PipeMemoryBlock} for pipe only if memory already used
is less than the
* specified threshold.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index ff9c996c6a2..ed1eab78929 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -19,11 +19,18 @@
package org.apache.iotdb.db.pipe.resource.memory;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -62,6 +69,127 @@ public class PipeMemoryWeightUtil {
return usageInBytes + 16L; // add the overhead of map
}
+ /**
+ * Given a row of a tablet, calculate the row count and memory cost of the
pipe tablet that will
+ * be constructed according to config.
+ *
+ * @return left is the row count of tablet, right is the memory cost of
tablet in bytes
+ */
+ public static Pair<Integer, Integer>
calculateTabletRowCountAndMemory(RowRecord row) {
+ int totalSizeInBytes = 0;
+
+ // timestamp
+ totalSizeInBytes += 8L;
+
+ // values
+ final List<Field> fields = row.getFields();
+ int schemaCount = 0;
+ if (fields != null) {
+ schemaCount = fields.size();
+ for (final Field field : fields) {
+ if (field == null) {
+ continue;
+ }
+
+ final TSDataType tsDataType = field.getDataType();
+ if (tsDataType == null) {
+ continue;
+ }
+
+ if (tsDataType.isBinary()) {
+ final Binary binary = field.getBinaryV();
+ totalSizeInBytes += binary == null ? 0 : binary.getLength();
+ } else {
+ totalSizeInBytes += tsDataType.getDataTypeSize();
+ }
+ }
+ }
+
+ return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes,
schemaCount);
+ }
+
+ /**
+ * Given a BatchData, calculate the row count and memory cost of the pipe
tablet that will be
+ * constructed according to config.
+ *
+ * @return left is the row count of tablet, right is the memory cost of
tablet in bytes
+ */
+ public static Pair<Integer, Integer>
calculateTabletRowCountAndMemory(BatchData batchData) {
+ int totalSizeInBytes = 0;
+ int schemaCount = 0;
+
+ // timestamp
+ totalSizeInBytes += 8L;
+
+ // values
+ final TSDataType type = batchData.getDataType();
+ if (type != null) {
+ if (type == TSDataType.VECTOR && batchData.getVector() != null) {
+ schemaCount = batchData.getVector().length;
+ for (int i = 0; i < schemaCount; ++i) {
+ final TsPrimitiveType primitiveType = batchData.getVector()[i];
+ if (primitiveType == null || primitiveType.getDataType() == null) {
+ continue;
+ }
+
+ if (primitiveType.getDataType().isBinary()) {
+ final Binary binary = primitiveType.getBinary();
+ totalSizeInBytes += binary == null ? 0 : binary.getLength();
+ } else {
+ totalSizeInBytes += primitiveType.getDataType().getDataTypeSize();
+ }
+ }
+ } else {
+ schemaCount = 1;
+ if (type.isBinary()) {
+ final Binary binary = batchData.getBinary();
+ totalSizeInBytes += binary == null ? 0 : binary.getLength();
+ } else {
+ totalSizeInBytes += type.getDataTypeSize();
+ }
+ }
+ }
+
+ return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes,
schemaCount);
+ }
+
+ /**
+ * Given a row of a tablet, calculate the row count and memory cost of the
pipe tablet that will
+ * be constructed according to config.
+ *
+ * @return left is the row count of tablet, right is the memory cost of
tablet in bytes
+ */
+ public static Pair<Integer, Integer>
calculateTabletRowCountAndMemory(PipeRow row) {
+ return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(),
row.size());
+ }
+
+ private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
+ int rowSize, int schemaCount) {
+ if (rowSize <= 0) {
+ return new Pair<>(1, 0);
+ }
+
+ // Calculate row number according to the max size of a pipe tablet.
+ // "-100" is the estimated size of other data structures in a pipe tablet.
+ // "*8" converts bytes to bits, because the bitmap size is 1 bit per
schema.
+ int rowNumber =
+ 8
+ *
(PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
+ / (8 * rowSize + schemaCount);
+ rowNumber = Math.max(1, rowNumber);
+
+ if ( // This means the row number is larger than the max row count of a
pipe tablet
+ rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
+ // Bound the row number, the memory cost is rowSize * rowNumber
+ return new Pair<>(
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
+ rowSize *
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+ } else {
+ return new Pair<>(
+ rowNumber,
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+ }
+ }
+
public static long calculateTabletSizeInBytes(Tablet tablet) {
long totalSizeInBytes = 0;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ef8f013f7a1..1a5505737cc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -202,6 +202,7 @@ public class CommonConfig {
private int pipeNonForwardingEventsProgressReportInterval = 100;
private int pipeDataStructureTabletRowSize = 2048;
+ private int pipeDataStructureTabletSizeInBytes = 2097152;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.4;
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
10_000;
@@ -674,6 +675,14 @@ public class CommonConfig {
this.pipeDataStructureTabletRowSize = pipeDataStructureTabletRowSize;
}
+ public int getPipeDataStructureTabletSizeInBytes() {
+ return pipeDataStructureTabletSizeInBytes;
+ }
+
+ public void setPipeDataStructureTabletSizeInBytes(int
pipeDataStructureTabletSizeInBytes) {
+ this.pipeDataStructureTabletSizeInBytes =
pipeDataStructureTabletSizeInBytes;
+ }
+
public double
getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold() {
return pipeDataStructureTabletMemoryBlockAllocationRejectThreshold;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 76e0a853dc2..310400b33ce 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -275,6 +275,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_data_structure_tablet_row_size",
String.valueOf(config.getPipeDataStructureTabletRowSize()))));
+ config.setPipeDataStructureTabletSizeInBytes(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_data_structure_tablet_size_in_bytes",
+
String.valueOf(config.getPipeDataStructureTabletSizeInBytes()))));
config.setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
Double.parseDouble(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 449bd149e67..957065286cc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -60,6 +60,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeDataStructureTabletRowSize();
}
+ public int getPipeDataStructureTabletSizeInBytes() {
+ return COMMON_CONFIG.getPipeDataStructureTabletSizeInBytes();
+ }
+
public double
getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold() {
return
COMMON_CONFIG.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold();
}
@@ -325,6 +329,7 @@ public class PipeConfig {
LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
LOGGER.info("PipeDataStructureTabletRowSize: {}",
getPipeDataStructureTabletRowSize());
+ LOGGER.info("PipeDataStructureTabletSizeInBytes: {}",
getPipeDataStructureTabletSizeInBytes());
LOGGER.info(
"PipeDataStructureTabletMemoryBlockAllocationRejectThreshold: {}",
getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold());