This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c0a642892f7 Pipe: Added memory control for aligned chunk reader in 
TsFileInsertionEventScanParser & TsFileInsertionEventTableParserTabletIterator 
(#15222)
c0a642892f7 is described below

commit c0a642892f71e0dd9d48a6926f57ea0d182e80de
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 1 19:01:40 2025 +0800

    Pipe: Added memory control for aligned chunk reader in 
TsFileInsertionEventScanParser & TsFileInsertionEventTableParserTabletIterator 
(#15222)
    
    Co-authored-by: luoluoyuyu <[email protected]>
---
 .../scan/TsFileInsertionEventScanParser.java       |  10 +-
 ...ileInsertionEventTableParserTabletIterator.java | 178 +++++++++++----------
 .../apache/iotdb/commons/conf/CommonConfig.java    |   9 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |   5 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   5 +
 5 files changed, 120 insertions(+), 87 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 1559d472e04..83dbdf39d05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -63,6 +64,8 @@ import java.util.Objects;
 
 public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser 
{
 
+  private static final int PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH =
+      PipeConfig.getInstance().getPipeMaxAlignedSeriesNumInOneBatch();
   private final long startTime;
   private final long endTime;
   private final Filter filter;
@@ -451,13 +454,16 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                     chunkHeader.getMeasurementID(),
                     (measurement, index) -> Objects.nonNull(index) ? index + 1 
: 0);
 
-            // Emit when encountered non-sequential value chunk
+            // Emit when encountered non-sequential value chunk, or the chunk 
list size exceeds
+            // certain value to avoid OOM
             // Do not record or end current value chunks when there are empty 
chunks
             if (chunkHeader.getDataSize() == 0) {
               break;
             }
             boolean needReturn = false;
-            if (lastIndex >= 0 && valueIndex != lastIndex) {
+            if (lastIndex >= 0
+                && (valueIndex != lastIndex
+                    || valueChunkList.size() >= 
PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH)) {
               needReturn = recordAlignedChunk(valueChunkList, marker);
             }
             lastIndex = valueIndex;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 32140edee8e..c080e1dc152 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
@@ -48,17 +49,16 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 public class TsFileInsertionEventTableParserTabletIterator implements 
Iterator<Tablet> {
-
+  private static final int PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH =
+      PipeConfig.getInstance().getPipeMaxAlignedSeriesNumInOneBatch();
   private final long startTime;
   private final long endTime;
 
@@ -80,10 +80,13 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
   private BatchData batchData;
 
   // Record the metadata information of the currently read Table
-  private Set<String> measurementNames;
   private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
-  private Iterator<List<IChunkMetadata>> chunkMetadataList;
+  private Iterator<AbstractAlignedChunkMetadata> chunkMetadataList;
   private Iterator<IChunkMetadata> chunkMetadata;
+  private AbstractAlignedChunkMetadata currentChunkMetadata;
+  private Chunk timeChunk;
+  private long timeChunkSize;
+  private int offset;
 
   // Record the information of the currently read Table
   private String tableName;
@@ -91,13 +94,12 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
   private List<Tablet.ColumnCategory> columnTypes;
   private List<String> measurementList;
   private List<TSDataType> dataTypeList;
-
-  private List<Pair<String, Integer>> measurementColumIndexList;
-  private List<Integer> measurementIdIndexList;
+  private int deviceIdSize;
 
   // Used to record whether the same Tablet is generated when parsing starts. 
Different table
   // information cannot be placed in the same Tablet.
   private boolean isSameTableName;
+  private boolean isSameDeviceID;
 
   public TsFileInsertionEventTableParserTabletIterator(
       final TsFileSequenceReader tsFileSequenceReader,
@@ -161,38 +163,35 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
               break;
             }
           case INIT_CHUNK_READER:
-            if (chunkMetadata != null && chunkMetadata.hasNext()) {
-              initChunkReader((AbstractAlignedChunkMetadata) 
chunkMetadata.next());
+            if (currentChunkMetadata != null
+                || (chunkMetadataList != null && chunkMetadataList.hasNext())) 
{
+              if (currentChunkMetadata == null) {
+                currentChunkMetadata = chunkMetadataList.next();
+                timeChunk = null;
+                offset = 0;
+              }
+              initChunkReader(currentChunkMetadata);
               state = State.INIT_DATA;
               break;
             }
-          case INIT_CHUNK_META:
-            if (chunkMetadataList != null && chunkMetadataList.hasNext()) {
-              chunkMetadata = chunkMetadataList.next().iterator();
-              state = State.INIT_CHUNK_READER;
-              break;
-            }
-          case INIT_CHUNK_META_LIST:
+          case INIT_CHUNK_METADATA:
             if (deviceMetaIterator != null && deviceMetaIterator.hasNext()) {
               final Pair<IDeviceID, MetadataIndexNode> pair = 
deviceMetaIterator.next();
 
               long size = 0;
-              List<List<IChunkMetadata>> iChunkMetadataList =
-                  reader.getIChunkMetadataList(pair.left, measurementNames, 
pair.right);
+              List<AbstractAlignedChunkMetadata> iChunkMetadataList =
+                  reader.getAlignedChunkMetadata(pair.left, true);
 
-              Iterator<List<IChunkMetadata>> chunkMetadataIterator = 
iChunkMetadataList.iterator();
+              Iterator<AbstractAlignedChunkMetadata> chunkMetadataIterator =
+                  iChunkMetadataList.iterator();
               while (chunkMetadataIterator.hasNext()) {
-                final List<IChunkMetadata> chunkMetadata = 
chunkMetadataIterator.next();
-                if (chunkMetadata == null
-                    || chunkMetadata.isEmpty()
-                    || !(chunkMetadata.get(0) instanceof 
AbstractAlignedChunkMetadata)) {
+                final AbstractAlignedChunkMetadata alignedChunkMetadata =
+                    chunkMetadataIterator.next();
+                if (alignedChunkMetadata == null) {
                   throw new PipeException(
                       "Table model tsfile parsing does not support this type 
of ChunkMeta");
                 }
 
-                final AbstractAlignedChunkMetadata alignedChunkMetadata =
-                    (AbstractAlignedChunkMetadata) chunkMetadata.get(0);
-
                 // Reduce the number of times Chunks are read
                 if (alignedChunkMetadata.getEndTime() < startTime
                     || alignedChunkMetadata.getStartTime() > endTime) {
@@ -209,7 +208,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
               deviceID = pair.getLeft();
               chunkMetadataList = iChunkMetadataList.iterator();
 
-              state = State.INIT_CHUNK_META;
+              state = State.INIT_CHUNK_READER;
               break;
             }
           case INIT_DEVICE_META:
@@ -224,34 +223,26 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
               deviceMetaIterator = metadataQuerier.deviceIterator(tableRoot, 
null);
 
               final int columnSchemaSize = 
tableSchema.getColumnSchemas().size();
-              dataTypeList = new ArrayList<>(columnSchemaSize);
-              columnTypes = new ArrayList<>(columnSchemaSize);
-              measurementList = new ArrayList<>(columnSchemaSize);
-              measurementNames = new HashSet<>();
-
-              measurementColumIndexList = new ArrayList<>(columnSchemaSize);
-              measurementIdIndexList = new ArrayList<>(columnSchemaSize);
+              dataTypeList = new 
ArrayList<>(PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
+              columnTypes = new 
ArrayList<>(PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
+              measurementList = new 
ArrayList<>(PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
 
-              for (int i = 0, j = 0; i < columnSchemaSize; i++) {
+              for (int i = 0; i < columnSchemaSize; i++) {
                 final IMeasurementSchema schema = 
tableSchema.getColumnSchemas().get(i);
                 final Tablet.ColumnCategory columnCategory = 
tableSchema.getColumnTypes().get(i);
                 if (schema != null
                     && schema.getMeasurementName() != null
                     && !schema.getMeasurementName().isEmpty()) {
                   final String measurementName = schema.getMeasurementName();
-                  columnTypes.add(columnCategory);
-                  measurementList.add(measurementName);
-                  dataTypeList.add(schema.getType());
-                  if (!Tablet.ColumnCategory.TAG.equals(columnCategory)) {
-                    measurementNames.add(measurementName);
-                    measurementColumIndexList.add(new Pair<>(measurementName, 
j));
-                  } else {
-                    measurementIdIndexList.add(j);
+                  if (Tablet.ColumnCategory.TAG.equals(columnCategory)) {
+                    columnTypes.add(Tablet.ColumnCategory.TAG);
+                    measurementList.add(measurementName);
+                    dataTypeList.add(schema.getType());
                   }
-                  j++;
                 }
               }
-              state = State.INIT_CHUNK_META_LIST;
+              deviceIdSize = dataTypeList.size();
+              state = State.INIT_CHUNK_METADATA;
               break;
             }
             return false;
@@ -266,8 +257,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
     CHECK_DATA,
     INIT_DATA,
     INIT_CHUNK_READER,
-    INIT_CHUNK_META,
-    INIT_CHUNK_META_LIST,
+    INIT_CHUNK_METADATA,
     INIT_DEVICE_META
   }
 
@@ -280,12 +270,13 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
     Tablet tablet = null;
 
     boolean isFirstRow = true;
-    while (hasNext() && (isFirstRow || isSameTableName)) {
+    while (hasNext() && (isFirstRow || (isSameTableName && isSameDeviceID))) {
       if (batchData.currentTime() >= startTime && batchData.currentTime() <= 
endTime) {
         if (isFirstRow) {
           // Record the name of the table when the tablet is started. 
Different table data cannot be
           // in the same tablet.
           isSameTableName = true;
+          isSameDeviceID = true;
 
           // Calculate row count and memory size of the tablet based on the 
first row
           final Pair<Integer, Integer> rowCountAndMemorySize =
@@ -319,7 +310,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
     }
 
     if (isFirstRow) {
-      
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunkMeta,
 0);
+      
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 
0);
       tablet = new Tablet(tableName, measurementList, dataTypeList, 
columnTypes, 0);
       tablet.initBitMaps();
     }
@@ -327,37 +318,56 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
     return tablet;
   }
 
-  private void initChunkReader(AbstractAlignedChunkMetadata 
alignedChunkMetadata)
+  private void initChunkReader(final AbstractAlignedChunkMetadata 
alignedChunkMetadata)
       throws IOException {
-    final Chunk timeChunk =
-        reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
-    long size = PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
-    
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
size);
-
-    final List<Chunk> valueChunkList =
-        new 
ArrayList<>(alignedChunkMetadata.getValueChunkMetadataList().size());
-    final Map<String, ChunkMetadata> metadataMap = new HashMap<>();
-    for (IChunkMetadata metadata : 
alignedChunkMetadata.getValueChunkMetadataList()) {
-      if (metadata != null) {
-        metadataMap.put(metadata.getMeasurementUid(), (ChunkMetadata) 
metadata);
-      }
+    if (Objects.isNull(timeChunk)) {
+      timeChunk = reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+      timeChunkSize = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+      
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
timeChunkSize);
+    }
+    timeChunk.getData().rewind();
+    long size = timeChunkSize;
+
+    final List<Chunk> valueChunkList = new 
ArrayList<>(PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
+
+    // To ensure that the Tablet has the same alignedChunk column as the 
current one,
+    // you need to create a new Tablet to fill in the data.
+    isSameDeviceID = false;
+
+    // Need to ensure that columnTypes recreates an array
+    final List<Tablet.ColumnCategory> categories =
+        new ArrayList<>(deviceIdSize + 
PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
+    for (int i = 0; i < deviceIdSize; i++) {
+      categories.add(Tablet.ColumnCategory.TAG);
     }
+    columnTypes = categories;
 
-    // The metadata obtained by alignedChunkMetadata.getValueChunkMetadataList 
may not be continuous
-    // when reading TSFile Chunks, so reordering the metadata here has little 
effect on the
-    // efficiency of reading chunks.
-    for (Pair<String, Integer> m : measurementColumIndexList) {
-      final ChunkMetadata metadata = metadataMap.get(m.getLeft());
+    // Clean up the remaining non-DeviceID column information
+    measurementList.subList(deviceIdSize, measurementList.size()).clear();
+    dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();
+
+    final int startOffset = offset;
+    for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); 
++offset) {
+      final IChunkMetadata metadata = 
alignedChunkMetadata.getValueChunkMetadataList().get(offset);
       if (metadata != null) {
-        final Chunk chunk = reader.readMemChunk(metadata);
+        // Record the column information corresponding to Meta to fill in 
Tablet
+        columnTypes.add(Tablet.ColumnCategory.FIELD);
+        measurementList.add(metadata.getMeasurementUid());
+        dataTypeList.add(metadata.getDataType());
 
+        final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
         size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
         
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
size);
 
         valueChunkList.add(chunk);
-        continue;
       }
-      valueChunkList.add(null);
+      if (offset - startOffset >= PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH) {
+        break;
+      }
+    }
+
+    if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) {
+      currentChunkMetadata = null;
     }
 
     this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
@@ -366,39 +376,37 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
   private void fillMeasurementValueColumns(
       final BatchData data, final Tablet tablet, final int rowIndex) {
     final TsPrimitiveType[] primitiveTypes = data.getVector();
-    final List<IMeasurementSchema> measurementSchemas = tablet.getSchemas();
 
-    for (int i = 0, size = measurementColumIndexList.size(); i < size; i++) {
-      final TsPrimitiveType primitiveType = primitiveTypes[i];
+    for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
+      final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
       if (primitiveType == null) {
         continue;
       }
 
-      final int index = measurementColumIndexList.get(i).getRight();
-      switch (measurementSchemas.get(index).getType()) {
+      switch (dataTypeList.get(i)) {
         case BOOLEAN:
-          tablet.addValue(rowIndex, index, primitiveType.getBoolean());
+          tablet.addValue(rowIndex, i, primitiveType.getBoolean());
           break;
         case INT32:
-          tablet.addValue(rowIndex, index, primitiveType.getInt());
+          tablet.addValue(rowIndex, i, primitiveType.getInt());
           break;
         case DATE:
-          tablet.addValue(rowIndex, index, 
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
+          tablet.addValue(rowIndex, i, 
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
           break;
         case INT64:
         case TIMESTAMP:
-          tablet.addValue(rowIndex, index, primitiveType.getLong());
+          tablet.addValue(rowIndex, i, primitiveType.getLong());
           break;
         case FLOAT:
-          tablet.addValue(rowIndex, index, primitiveType.getFloat());
+          tablet.addValue(rowIndex, i, primitiveType.getFloat());
           break;
         case DOUBLE:
-          tablet.addValue(rowIndex, index, primitiveType.getDouble());
+          tablet.addValue(rowIndex, i, primitiveType.getDouble());
           break;
         case TEXT:
         case BLOB:
         case STRING:
-          tablet.addValue(rowIndex, index, 
primitiveType.getBinary().getValues());
+          tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues());
           break;
         default:
           throw new UnSupportedDataTypeException("UnSupported" + 
primitiveType.getDataType());
@@ -413,7 +421,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
       if (deviceIdSegments[i] == null) {
         continue;
       }
-      tablet.addValue(rowIndex, measurementIdIndexList.get(i - 1), 
deviceIdSegments[i]);
+      tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]);
     }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6a78fce213a..9da31b3cbd7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -288,6 +288,7 @@ public class CommonConfig {
   private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
   private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
   private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
+  private int pipeMaxAlignedSeriesNumInOneBatch = 15;
   private long pipeListeningQueueTransferSnapshotThreshold = 1000;
   private int pipeSnapshotExecutionMaxBatchSize = 1000;
   private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
@@ -1284,6 +1285,14 @@ public class CommonConfig {
     this.pipeLeaderCacheMemoryUsagePercentage = 
pipeLeaderCacheMemoryUsagePercentage;
   }
 
+  public int getPipeMaxAlignedSeriesNumInOneBatch() {
+    return pipeMaxAlignedSeriesNumInOneBatch;
+  }
+
+  public void setPipeMaxAlignedSeriesNumInOneBatch(int 
pipeMaxAlignedSeriesNumInOneBatch) {
+    this.pipeMaxAlignedSeriesNumInOneBatch = pipeMaxAlignedSeriesNumInOneBatch;
+  }
+
   public long getPipeListeningQueueTransferSnapshotThreshold() {
     return pipeListeningQueueTransferSnapshotThreshold;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 36eb46b73a5..ac087dc2afd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -670,6 +670,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_leader_cache_memory_usage_percentage",
                 
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
+    config.setPipeMaxAlignedSeriesNumInOneBatch(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_max_aligned_series_num_in_one_batch",
+                
String.valueOf(config.getPipeMaxAlignedSeriesNumInOneBatch()))));
     config.setPipeListeningQueueTransferSnapshotThreshold(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 3c13ae02cb7..03d8653b4dd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -192,6 +192,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
   }
 
+  public int getPipeMaxAlignedSeriesNumInOneBatch() {
+    return COMMON_CONFIG.getPipeMaxAlignedSeriesNumInOneBatch();
+  }
+
   public long getPipeListeningQueueTransferSnapshotThreshold() {
     return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
   }
@@ -443,6 +447,7 @@ public class PipeConfig {
         isPipeConnectorRPCThriftCompressionEnabled());
     LOGGER.info(
         "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
+    LOGGER.info("PipeMaxAlignedSeriesNumInOneBatch: {}", 
getPipeMaxAlignedSeriesNumInOneBatch());
     LOGGER.info(
         "PipeListeningQueueTransferSnapshotThreshold: {}",
         getPipeListeningQueueTransferSnapshotThreshold());

Reply via email to