This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 08fc8abe247 Pipe: better memory control for in-memory tablets (#13301)
08fc8abe247 is described below

commit 08fc8abe247d16bd89a5911081720b8a59e19517
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Sep 2 18:36:56 2024 +0800

    Pipe: better memory control for in-memory tablets (#13301)
---
 .../iotdb/db/pipe/event/common/row/PipeRow.java    |  15 +++
 .../db/pipe/event/common/row/PipeRowCollector.java |  14 +--
 .../common/tablet/PipeRawTabletInsertionEvent.java |   6 +-
 .../container/TsFileInsertionDataContainer.java    |  12 ++
 .../query/TsFileInsertionQueryDataContainer.java   |   3 +-
 .../TsFileInsertionQueryDataTabletIterator.java    |  46 ++++++--
 .../scan/TsFileInsertionScanDataContainer.java     |  43 +++++--
 .../db/pipe/resource/memory/PipeMemoryManager.java |  59 +++++++++-
 .../pipe/resource/memory/PipeMemoryWeightUtil.java | 128 +++++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    |   9 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |   5 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   5 +
 12 files changed, 314 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 8757b6613c9..6d4d25b9542 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -190,6 +190,21 @@ public class PipeRow implements Row {
     return isAligned;
   }
 
+  public int getCurrentRowSize() {
+    int rowSize = 0;
+    rowSize += 8; // timestamp
+    for (int i = 0; i < valueColumnTypes.length; i++) {
+      if (valueColumnTypes[i] != null) {
+        if (valueColumnTypes[i].isBinary()) {
+          rowSize += getBinary(i) != null ? getBinary(i).getLength() : 0;
+        } else {
+          rowSize += valueColumnTypes[i].getDataTypeSize();
+        }
+      }
+    }
+    return rowSize;
+  }
+
   public IMeasurementSchema[] getMeasurementSchemaList() {
     return measurementSchemaList;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index c1f7739534d..4646041a8a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -19,15 +19,16 @@
 
 package org.apache.iotdb.db.pipe.event.common.row;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
@@ -66,13 +67,12 @@ public class PipeRowCollector implements RowCollector {
       final String deviceId = pipeRow.getDeviceId();
       final List<IMeasurementSchema> measurementSchemaList =
           new ArrayList<>(Arrays.asList(measurementSchemaArray));
-      tablet =
-          new Tablet(
-              deviceId,
-              measurementSchemaList,
-              PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
-      isAligned = pipeRow.isAligned();
+      // Calculate row count and memory size of the tablet based on the first 
row
+      Pair<Integer, Integer> rowCountAndMemorySize =
+          PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow);
+      tablet = new Tablet(deviceId, measurementSchemaList, 
rowCountAndMemorySize.getLeft());
       tablet.initBitMaps();
+      isAligned = pipeRow.isAligned();
     }
 
     final int rowIndex = tablet.rowSize;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 285121ae63c..f06819323ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -110,7 +111,10 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
-    allocatedMemoryBlock = 
PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet);
+    allocatedMemoryBlock =
+        PipeDataNodeResourceManager.memory()
+            .forceAllocateForTabletWithRetry(
+                PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index 2e8a7ec6efa..d41ed8b0868 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -22,6 +22,8 @@ package 
org.apache.iotdb.db.pipe.event.common.tsfile.container;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.read.TsFileSequenceReader;
@@ -42,6 +44,8 @@ public abstract class TsFileInsertionDataContainer implements 
AutoCloseable {
   protected final PipeTaskMeta pipeTaskMeta; // used to report progress
   protected final EnrichedEvent sourceEvent; // used to report progress
 
+  protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
+
   protected TsFileSequenceReader tsFileSequenceReader;
 
   protected TsFileInsertionDataContainer(
@@ -58,6 +62,10 @@ public abstract class TsFileInsertionDataContainer 
implements AutoCloseable {
 
     this.pipeTaskMeta = pipeTaskMeta;
     this.sourceEvent = sourceEvent;
+
+    // Allocate empty memory block, will be resized later.
+    this.allocatedMemoryBlockForTablet =
+        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
   }
 
   /**
@@ -74,5 +82,9 @@ public abstract class TsFileInsertionDataContainer implements 
AutoCloseable {
     } catch (final IOException e) {
       LOGGER.warn("Failed to close TsFileSequenceReader", e);
     }
+
+    if (allocatedMemoryBlockForTablet != null) {
+      allocatedMemoryBlockForTablet.close();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index c62d5bc603a..955a7614851 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -280,7 +280,8 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
                         measurementDataTypeMap,
                         entry.getKey(),
                         entry.getValue(),
-                        timeFilterExpression);
+                        timeFilterExpression,
+                        allocatedMemoryBlockForTablet);
               } catch (final Exception e) {
                 close();
                 throw new PipeException("failed to create 
TsFileInsertionDataTabletIterator", e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
index e6bcdca94b2..d644d15297a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
@@ -32,6 +34,7 @@ import org.apache.tsfile.read.common.RowRecord;
 import org.apache.tsfile.read.expression.IExpression;
 import org.apache.tsfile.read.expression.QueryExpression;
 import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -42,6 +45,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class TsFileInsertionQueryDataTabletIterator implements 
Iterator<Tablet> {
@@ -56,12 +60,15 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
 
   private final QueryDataSet queryDataSet;
 
+  private final PipeMemoryBlock allocatedBlockForTablet;
+
   TsFileInsertionQueryDataTabletIterator(
       final TsFileReader tsFileReader,
       final Map<String, TSDataType> measurementDataTypeMap,
       final IDeviceID deviceId,
       final List<String> measurements,
-      final IExpression timeFilterExpression)
+      final IExpression timeFilterExpression,
+      final PipeMemoryBlock allocatedBlockForTablet)
       throws IOException {
     this.tsFileReader = tsFileReader;
     this.measurementDataTypeMap = measurementDataTypeMap;
@@ -79,6 +86,8 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
     this.timeFilterExpression = timeFilterExpression;
 
     this.queryDataSet = buildQueryDataSet();
+
+    this.allocatedBlockForTablet = 
Objects.requireNonNull(allocatedBlockForTablet);
   }
 
   private QueryDataSet buildQueryDataSet() throws IOException {
@@ -118,16 +127,35 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
           measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR 
+ measurement);
       schemas.add(new MeasurementSchema(measurement, dataType));
     }
-    final Tablet tablet =
-        new Tablet(
-            // Used for tree model
-            deviceId.toString(),
-            schemas,
-            PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
-    tablet.initBitMaps();
 
+    Tablet tablet = null;
+    if (!queryDataSet.hasNext()) {
+      tablet =
+          new Tablet(
+              // Used for tree model
+              deviceId.toString(), schemas, 1);
+      tablet.initBitMaps();
+      // Ignore the memory cost of tablet
+      
PipeDataNodeResourceManager.memory().forceResize(allocatedBlockForTablet, 0);
+      return tablet;
+    }
+
+    boolean isFirstRow = true;
     while (queryDataSet.hasNext()) {
       final RowRecord rowRecord = queryDataSet.next();
+      if (isFirstRow) {
+        // Calculate row count and memory size of the tablet based on the 
first row
+        Pair<Integer, Integer> rowCountAndMemorySize =
+            PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(rowRecord);
+        tablet =
+            new Tablet(
+                // Used for tree model
+                deviceId.toString(), schemas, rowCountAndMemorySize.getLeft());
+        tablet.initBitMaps();
+        PipeDataNodeResourceManager.memory()
+            .forceResize(allocatedBlockForTablet, 
rowCountAndMemorySize.getRight());
+        isFirstRow = false;
+      }
 
       final int rowIndex = tablet.rowSize;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 8dc4e6a07e7..3151c951229 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -19,12 +19,13 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -44,6 +45,7 @@ import org.apache.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
@@ -175,15 +177,32 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
   private Tablet getNextTablet() {
     try {
-      final Tablet tablet =
-          new Tablet(
-              currentDevice.toString(),
-              currentMeasurements,
-              PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
-      tablet.initBitMaps();
+      Tablet tablet = null;
 
+      if (!data.hasCurrent()) {
+        tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
+        tablet.initBitMaps();
+        // Ignore the memory cost of tablet
+        
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 
0);
+        return tablet;
+      }
+
+      boolean isFirstRow = true;
       while (data.hasCurrent()) {
         if (isMultiPage || data.currentTime() >= startTime && 
data.currentTime() <= endTime) {
+          if (isFirstRow) {
+            // Calculate row count and memory size of the tablet based on the 
first row
+            Pair<Integer, Integer> rowCountAndMemorySize =
+                PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(data);
+            tablet =
+                new Tablet(
+                    currentDevice.toString(), currentMeasurements, 
rowCountAndMemorySize.getLeft());
+            tablet.initBitMaps();
+            PipeDataNodeResourceManager.memory()
+                .forceResize(allocatedMemoryBlockForTablet, 
rowCountAndMemorySize.getRight());
+            isFirstRow = false;
+          }
+
           final int rowIndex = tablet.rowSize;
 
           tablet.addTimestamp(rowIndex, data.currentTime());
@@ -197,16 +216,22 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
           data = chunkReader.nextPageData();
         }
 
-        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        if (tablet != null && tablet.rowSize == tablet.getMaxRowNumber()) {
           break;
         }
       }
 
+      if (tablet == null) {
+        tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
+        tablet.initBitMaps();
+        // Ignore the memory cost of tablet
+        
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 
0);
+      }
+
       // Switch chunk reader iff current chunk is all consumed
       if (!data.hasCurrent()) {
         prepareData();
       }
-
       return tablet;
     } catch (final Exception e) {
       close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index bf5db9d0bb2..4763fcc71b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 
-import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +74,7 @@ public class PipeMemoryManager {
     return forceAllocate(sizeInBytes, false);
   }
 
-  public PipeTabletMemoryBlock forceAllocateWithRetry(Tablet tablet)
+  public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long 
tabletSizeInBytes)
       throws PipeRuntimeOutOfMemoryCriticalException {
     if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
       // No need to calculate the tablet size, skip it to save time
@@ -107,8 +106,7 @@ public class PipeMemoryManager {
 
     synchronized (this) {
       final PipeTabletMemoryBlock block =
-          (PipeTabletMemoryBlock)
-              
forceAllocate(PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet), true);
+          (PipeTabletMemoryBlock) forceAllocate(tabletSizeInBytes, true);
       usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
       return block;
     }
@@ -147,6 +145,59 @@ public class PipeMemoryManager {
             sizeInBytes));
   }
 
+  public synchronized void forceResize(PipeMemoryBlock block, long targetSize) 
{
+    if (block == null || block.isReleased()) {
+      LOGGER.warn("forceResize: cannot resize a null or released memory 
block");
+      return;
+    }
+
+    if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+      block.setMemoryUsageInBytes(targetSize);
+      return;
+    }
+
+    final long oldSize = block.getMemoryUsageInBytes();
+
+    if (oldSize >= targetSize) {
+      usedMemorySizeInBytes -= oldSize - targetSize;
+      if (block instanceof PipeTabletMemoryBlock) {
+        usedMemorySizeInBytesOfTablets -= oldSize - targetSize;
+      }
+      block.setMemoryUsageInBytes(targetSize);
+      return;
+    }
+
+    long sizeInBytes = targetSize - oldSize;
+    for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
+      if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
+        usedMemorySizeInBytes += sizeInBytes;
+        if (block instanceof PipeTabletMemoryBlock) {
+          usedMemorySizeInBytesOfTablets += sizeInBytes;
+        }
+        block.setMemoryUsageInBytes(targetSize);
+        return;
+      }
+
+      try {
+        tryShrink4Allocate(sizeInBytes);
+        this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("forceResize: interrupted while waiting for available 
memory", e);
+      }
+    }
+
+    throw new PipeRuntimeOutOfMemoryCriticalException(
+        String.format(
+            "forceResize: failed to allocate memory after %d retries, "
+                + "total memory size %d bytes, used memory size %d bytes, "
+                + "requested memory size %d bytes",
+            MEMORY_ALLOCATE_MAX_RETRIES,
+            TOTAL_MEMORY_SIZE_IN_BYTES,
+            usedMemorySizeInBytes,
+            sizeInBytes));
+  }
+
   /**
    * Allocate a {@link PipeMemoryBlock} for pipe only if memory already used 
is less than the
    * specified threshold.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index ff9c996c6a2..ed1eab78929 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -19,11 +19,18 @@
 
 package org.apache.iotdb.db.pipe.resource.memory;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
 import org.apache.iotdb.db.utils.MemUtils;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.read.common.RowRecord;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
@@ -62,6 +69,127 @@ public class PipeMemoryWeightUtil {
     return usageInBytes + 16L; // add the overhead of map
   }
 
+  /**
+   * Given a row of a tablet, calculate the row count and memory cost of the 
pipe tablet that will
+   * be constructed according to config.
+   *
+   * @return left is the row count of tablet, right is the memory cost of 
tablet in bytes
+   */
+  public static Pair<Integer, Integer> 
calculateTabletRowCountAndMemory(RowRecord row) {
+    int totalSizeInBytes = 0;
+
+    // timestamp
+    totalSizeInBytes += 8L;
+
+    // values
+    final List<Field> fields = row.getFields();
+    int schemaCount = 0;
+    if (fields != null) {
+      schemaCount = fields.size();
+      for (final Field field : fields) {
+        if (field == null) {
+          continue;
+        }
+
+        final TSDataType tsDataType = field.getDataType();
+        if (tsDataType == null) {
+          continue;
+        }
+
+        if (tsDataType.isBinary()) {
+          final Binary binary = field.getBinaryV();
+          totalSizeInBytes += binary == null ? 0 : binary.getLength();
+        } else {
+          totalSizeInBytes += tsDataType.getDataTypeSize();
+        }
+      }
+    }
+
+    return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, 
schemaCount);
+  }
+
+  /**
+   * Given a BatchData, calculate the row count and memory cost of the pipe 
tablet that will be
+   * constructed according to config.
+   *
+   * @return left is the row count of tablet, right is the memory cost of 
tablet in bytes
+   */
+  public static Pair<Integer, Integer> 
calculateTabletRowCountAndMemory(BatchData batchData) {
+    int totalSizeInBytes = 0;
+    int schemaCount = 0;
+
+    // timestamp
+    totalSizeInBytes += 8L;
+
+    // values
+    final TSDataType type = batchData.getDataType();
+    if (type != null) {
+      if (type == TSDataType.VECTOR && batchData.getVector() != null) {
+        schemaCount = batchData.getVector().length;
+        for (int i = 0; i < schemaCount; ++i) {
+          final TsPrimitiveType primitiveType = batchData.getVector()[i];
+          if (primitiveType == null || primitiveType.getDataType() == null) {
+            continue;
+          }
+
+          if (primitiveType.getDataType().isBinary()) {
+            final Binary binary = primitiveType.getBinary();
+            totalSizeInBytes += binary == null ? 0 : binary.getLength();
+          } else {
+            totalSizeInBytes += primitiveType.getDataType().getDataTypeSize();
+          }
+        }
+      } else {
+        schemaCount = 1;
+        if (type.isBinary()) {
+          final Binary binary = batchData.getBinary();
+          totalSizeInBytes += binary == null ? 0 : binary.getLength();
+        } else {
+          totalSizeInBytes += type.getDataTypeSize();
+        }
+      }
+    }
+
+    return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, 
schemaCount);
+  }
+
+  /**
+   * Given a row of a tablet, calculate the row count and memory cost of the 
pipe tablet that will
+   * be constructed according to config.
+   *
+   * @return left is the row count of tablet, right is the memory cost of 
tablet in bytes
+   */
+  public static Pair<Integer, Integer> 
calculateTabletRowCountAndMemory(PipeRow row) {
+    return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), 
row.size());
+  }
+
+  private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
+      int rowSize, int schemaCount) {
+    if (rowSize <= 0) {
+      return new Pair<>(1, 0);
+    }
+
+    // Calculate row number according to the max size of a pipe tablet.
+    // "-100" is the estimated size of other data structures in a pipe tablet.
+    // "*8" converts bytes to bits, because the bitmap size is 1 bit per 
schema.
+    int rowNumber =
+        8
+            * 
(PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
+            / (8 * rowSize + schemaCount);
+    rowNumber = Math.max(1, rowNumber);
+
+    if ( // This means the row number is larger than the max row count of a 
pipe tablet
+    rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
+      // Bound the row number, the memory cost is rowSize * rowNumber
+      return new Pair<>(
+          PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
+          rowSize * 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+    } else {
+      return new Pair<>(
+          rowNumber, 
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+    }
+  }
+
   public static long calculateTabletSizeInBytes(Tablet tablet) {
     long totalSizeInBytes = 0;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ef8f013f7a1..1a5505737cc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -202,6 +202,7 @@ public class CommonConfig {
   private int pipeNonForwardingEventsProgressReportInterval = 100;
 
   private int pipeDataStructureTabletRowSize = 2048;
+  private int pipeDataStructureTabletSizeInBytes = 2097152;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.4;
 
   private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 
10_000;
@@ -674,6 +675,14 @@ public class CommonConfig {
     this.pipeDataStructureTabletRowSize = pipeDataStructureTabletRowSize;
   }
 
+  public int getPipeDataStructureTabletSizeInBytes() {
+    return pipeDataStructureTabletSizeInBytes;
+  }
+
+  public void setPipeDataStructureTabletSizeInBytes(int 
pipeDataStructureTabletSizeInBytes) {
+    this.pipeDataStructureTabletSizeInBytes = 
pipeDataStructureTabletSizeInBytes;
+  }
+
   public double 
getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold() {
     return pipeDataStructureTabletMemoryBlockAllocationRejectThreshold;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 76e0a853dc2..310400b33ce 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -275,6 +275,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_data_structure_tablet_row_size",
                 String.valueOf(config.getPipeDataStructureTabletRowSize()))));
+    config.setPipeDataStructureTabletSizeInBytes(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_data_structure_tablet_size_in_bytes",
+                
String.valueOf(config.getPipeDataStructureTabletSizeInBytes()))));
     config.setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
         Double.parseDouble(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 449bd149e67..957065286cc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -60,6 +60,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeDataStructureTabletRowSize();
   }
 
+  public int getPipeDataStructureTabletSizeInBytes() {
+    return COMMON_CONFIG.getPipeDataStructureTabletSizeInBytes();
+  }
+
   public double 
getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold() {
     return 
COMMON_CONFIG.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold();
   }
@@ -325,6 +329,7 @@ public class PipeConfig {
     LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
 
     LOGGER.info("PipeDataStructureTabletRowSize: {}", 
getPipeDataStructureTabletRowSize());
+    LOGGER.info("PipeDataStructureTabletSizeInBytes: {}", 
getPipeDataStructureTabletSizeInBytes());
     LOGGER.info(
         "PipeDataStructureTabletMemoryBlockAllocationRejectThreshold: {}",
         getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold());

Reply via email to