This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new fca7fee95e1 [To dev/1.3] Pipe: Cleaned some questionable parameters & 
Fixed unstable testPipeAfterDataRegionLeaderStop (#16555) (#16565)
fca7fee95e1 is described below

commit fca7fee95e1143ba45c2f23e3286bf2070792ed2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Oct 13 12:22:59 2025 +0800

    [To dev/1.3] Pipe: Cleaned some questionable parameters & Fixed unstable 
testPipeAfterDataRegionLeaderStop (#16555) (#16565)
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../metadata/AlignedTimeseriesException.java       | 33 --------------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  4 +-
 .../scan/TsFileInsertionScanDataContainer.java     |  3 +-
 .../visitor/PipeStatementTSStatusVisitor.java      |  5 ---
 .../db/pipe/resource/memory/PipeMemoryManager.java |  6 ---
 .../apache/iotdb/commons/conf/CommonConfig.java    | 50 +++-------------------
 .../iotdb/commons/pipe/config/PipeConfig.java      | 49 ++++++++-------------
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 14 +-----
 9 files changed, 32 insertions(+), 133 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index a4b7ad9bd3a..d28510c2b7f 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -93,6 +93,7 @@ public enum TSStatusCode {
   WRITE_PROCESS_REJECT(606),
   OUT_OF_TTL(607),
   COMPACTION_ERROR(608),
+  @Deprecated
   ALIGNED_TIMESERIES_ERROR(609),
   WAL_ERROR(610),
   DISK_SPACE_INSUFFICIENT(611),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
deleted file mode 100644
index f11c986afdc..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.exception.metadata;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-public class AlignedTimeseriesException extends MetadataException {
-
-  public AlignedTimeseriesException(String message, String path) {
-    super(
-        String.format("%s (Path: %s)", message, path),
-        TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(),
-        true);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 4c2a6ef1a6a..f916ac067c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -771,12 +771,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final long allocatedMemorySizeInBytes = 
this.getAllFloatingMemoryUsageInByte();
     final long remainingMemory =
         PipeMemoryManager.getTotalFloatingMemorySizeInBytes() - 
allocatedMemorySizeInBytes;
-    if (remainingMemory < 
PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
+    if (remainingMemory < 
PipeConfig.getInstance().getPipeInsertNodeQueueMemory()) {
       final String message =
           String.format(
               "%s Need Floating memory: %d  bytes, free Floating memory: %d 
bytes",
               MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
-              PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
+              PipeConfig.getInstance().getPipeInsertNodeQueueMemory(),
               remainingMemory);
       LOGGER.warn(message);
       throw new PipeException(message);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 4223a5155f2..c5ef8423297 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -113,8 +113,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
                 
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
     this.allocatedMemoryBlockForChunk =
         PipeDataNodeResourceManager.memory()
-            .forceAllocateForTabletWithRetry(
-                
PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+            
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
 
     try {
       tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 0874857c3f7..6eae20e506f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -132,7 +132,6 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     } else if (context.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
-        || context.getCode() == 
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()
         || context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
@@ -167,10 +166,6 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
             && status.getCode() != 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
-          if (status.getCode() == 
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
-            return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-                .setMessage(context.getMessage());
-          }
           return visitStatement(statement, context);
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 2b53e85d049..87be4d5fb62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -640,12 +640,6 @@ public class PipeMemoryManager {
     return usedMemorySizeInBytesOfTsFiles;
   }
 
-  public long getAllocatedMemorySizeInBytesOfBatch() {
-    return (long)
-        (PipeConfig.getInstance().getPipeDataStructureBatchMemoryProportion()
-            * getTotalNonFloatingMemorySizeInBytes());
-  }
-
   public long getFreeMemorySizeInBytes() {
     return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 24d72a5e415..88e6985070f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -215,7 +215,6 @@ public class CommonConfig {
   private int pipeDataStructureTabletSizeInBytes = 2097152;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.3;
   private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.3;
-  private double PipeDataStructureBatchMemoryProportion = 0.2;
   private volatile double pipeTotalFloatingMemoryProportion = 0.5;
 
   // Check if memory check is enabled for Pipe
@@ -251,7 +250,6 @@ public class CommonConfig {
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
 
   private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
-  private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
 
   private long pipeMaxWaitFinishTime = 10 * 1000;
 
@@ -316,7 +314,7 @@ public class CommonConfig {
   private volatile long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 
3Min
   private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
   private volatile float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
-  private volatile long pipeMaxAlignedSeriesChunkSizeInOneBatch = (long) 16 * 
1024 * 1024; // 16MB;
+  private volatile long pipeMaxReaderChunkSize = (long) 16 * 1024 * 1024; // 
16MB;
   private volatile long pipeListeningQueueTransferSnapshotThreshold = 1000;
   private volatile int pipeSnapshotExecutionMaxBatchSize = 1000;
   private volatile long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
@@ -834,21 +832,6 @@ public class CommonConfig {
         pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
   }
 
-  public double getPipeDataStructureBatchMemoryProportion() {
-    return PipeDataStructureBatchMemoryProportion;
-  }
-
-  public void setPipeDataStructureBatchMemoryProportion(
-      double PipeDataStructureBatchMemoryProportion) {
-    if (this.PipeDataStructureBatchMemoryProportion == 
PipeDataStructureBatchMemoryProportion) {
-      return;
-    }
-    this.PipeDataStructureBatchMemoryProportion = 
PipeDataStructureBatchMemoryProportion;
-    logger.info(
-        "PipeDataStructureBatchMemoryProportion is set to {}.",
-        PipeDataStructureBatchMemoryProportion);
-  }
-
   public boolean isPipeEnableMemoryChecked() {
     return isPipeEnableMemoryCheck;
   }
@@ -1396,22 +1379,6 @@ public class CommonConfig {
         pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds);
   }
 
-  public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
-    return pipeSubtaskExecutorForcedRestartIntervalMs;
-  }
-
-  public void setPipeSubtaskExecutorForcedRestartIntervalMs(
-      long pipeSubtaskExecutorForcedRestartIntervalMs) {
-    if (this.pipeSubtaskExecutorForcedRestartIntervalMs
-        == pipeSubtaskExecutorForcedRestartIntervalMs) {
-      return;
-    }
-    this.pipeSubtaskExecutorForcedRestartIntervalMs = 
pipeSubtaskExecutorForcedRestartIntervalMs;
-    logger.info(
-        "pipeSubtaskExecutorForcedRestartIntervalMs is set to {}",
-        pipeSubtaskExecutorForcedRestartIntervalMs);
-  }
-
   public long getPipeMaxWaitFinishTime() {
     return pipeMaxWaitFinishTime;
   }
@@ -1730,19 +1697,16 @@ public class CommonConfig {
         "pipeLeaderCacheMemoryUsagePercentage is set to {}", 
pipeLeaderCacheMemoryUsagePercentage);
   }
 
-  public long getPipeMaxAlignedSeriesChunkSizeInOneBatch() {
-    return pipeMaxAlignedSeriesChunkSizeInOneBatch;
+  public long getPipeMaxReaderChunkSize() {
+    return pipeMaxReaderChunkSize;
   }
 
-  public void setPipeMaxAlignedSeriesChunkSizeInOneBatch(
-      long pipeMaxAlignedSeriesChunkSizeInOneBatch) {
-    if (this.pipeMaxAlignedSeriesChunkSizeInOneBatch == 
pipeMaxAlignedSeriesChunkSizeInOneBatch) {
+  public void setPipeMaxReaderChunkSize(long pipeMaxReaderChunkSize) {
+    if (this.pipeMaxReaderChunkSize == pipeMaxReaderChunkSize) {
       return;
     }
-    this.pipeMaxAlignedSeriesChunkSizeInOneBatch = 
pipeMaxAlignedSeriesChunkSizeInOneBatch;
-    logger.info(
-        "pipeMaxAlignedSeriesChunkSizeInOneBatch is set to {}",
-        pipeMaxAlignedSeriesChunkSizeInOneBatch);
+    this.pipeMaxReaderChunkSize = pipeMaxReaderChunkSize;
+    logger.info("pipeMaxAlignedSeriesChunkSizeInOneBatch is set to {}", 
pipeMaxReaderChunkSize);
   }
 
   public long getPipeListeningQueueTransferSnapshotThreshold() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ec21b8fe72e..2578008f68e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -69,15 +69,13 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
   }
 
-  public double getPipeDataStructureBatchMemoryProportion() {
-    return COMMON_CONFIG.getPipeDataStructureBatchMemoryProportion();
-  }
+  /////////////////////////////// Estimation ///////////////////////////////
 
   public boolean isPipeEnableMemoryCheck() {
     return COMMON_CONFIG.isPipeEnableMemoryChecked();
   }
 
-  public long PipeInsertNodeQueueMemory() {
+  public long getPipeInsertNodeQueueMemory() {
     return COMMON_CONFIG.getPipeInsertNodeQueueMemory();
   }
 
@@ -141,10 +139,6 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
   }
 
-  public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
-    return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
-  }
-
   public long getPipeMaxWaitFinishTime() {
     return COMMON_CONFIG.getPipeMaxWaitFinishTime();
   }
@@ -233,20 +227,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
   }
 
-  public float getPipeLeaderCacheMemoryUsagePercentage() {
-    return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
-  }
-
-  public long getPipeMaxAlignedSeriesChunkSizeInOneBatch() {
-    return COMMON_CONFIG.getPipeMaxAlignedSeriesChunkSizeInOneBatch();
-  }
-
-  public long getPipeListeningQueueTransferSnapshotThreshold() {
-    return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
-  }
-
-  public int getPipeSnapshotExecutionMaxBatchSize() {
-    return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+  public long getPipeMaxReaderChunkSize() {
+    return COMMON_CONFIG.getPipeMaxReaderChunkSize();
   }
 
   public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
@@ -407,6 +389,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeCheckMemoryEnoughIntervalMs();
   }
 
+  public float getPipeLeaderCacheMemoryUsagePercentage() {
+    return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
+  }
+
   /////////////////////////////// TwoStage ///////////////////////////////
 
   public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
@@ -421,6 +407,16 @@ public class PipeConfig {
     return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
   }
 
+  /////////////////////////////// Meta ///////////////////////////////
+
+  public long getPipeListeningQueueTransferSnapshotThreshold() {
+    return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
+  }
+
+  public int getPipeSnapshotExecutionMaxBatchSize() {
+    return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+  }
+
   /////////////////////////////// Ref ///////////////////////////////
 
   public boolean getPipeEventReferenceTrackingEnabled() {
@@ -456,8 +452,6 @@ public class PipeConfig {
         getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
     LOGGER.info("PipeTotalFloatingMemoryProportion: {}", 
getPipeTotalFloatingMemoryProportion());
 
-    LOGGER.info(
-        "PipeDataStructureBatchMemoryProportion: {}", 
getPipeDataStructureBatchMemoryProportion());
     LOGGER.info("IsPipeEnableMemoryCheck: {}", isPipeEnableMemoryCheck());
     LOGGER.info("PipeTsFileParserMemory: {}", getTsFileParserMemory());
     LOGGER.info("SinkBatchMemoryInsertNode: {}", 
getSinkBatchMemoryInsertNode());
@@ -485,9 +479,6 @@ public class PipeConfig {
     LOGGER.info(
         "PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
         getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
-    LOGGER.info(
-        "PipeSubtaskExecutorForcedRestartIntervalMs: {}",
-        getPipeSubtaskExecutorForcedRestartIntervalMs());
     LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
 
     LOGGER.info(
@@ -510,9 +501,7 @@ public class PipeConfig {
         isPipeConnectorRPCThriftCompressionEnabled());
     LOGGER.info(
         "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
-    LOGGER.info(
-        "PipeMaxAlignedSeriesChunkSizeInOneBatch: {}",
-        getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+    LOGGER.info("PipeReaderChunkSize: {}", getPipeMaxReaderChunkSize());
     LOGGER.info(
         "PipeListeningQueueTransferSnapshotThreshold: {}",
         getPipeListeningQueueTransferSnapshotThreshold());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 1beb4c67e25..27e5fddf14d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -210,11 +210,6 @@ public class PipeDescriptor {
                 
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
                 String.valueOf(
                     
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
-    config.setPipeDataStructureBatchMemoryProportion(
-        Double.parseDouble(
-            properties.getProperty(
-                "pipe_data_structure_batch_memory_proportion",
-                
String.valueOf(config.getPipeDataStructureBatchMemoryProportion()))));
     config.setPipeTotalFloatingMemoryProportion(
         Double.parseDouble(
             properties.getProperty(
@@ -295,11 +290,6 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
                 
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
-    config.setPipeSubtaskExecutorForcedRestartIntervalMs(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_subtask_executor_forced_restart_interval_ms",
-                
String.valueOf(config.getPipeSubtaskExecutorForcedRestartIntervalMs()))));
 
     config.setPipeExtractorAssignerDisruptorRingBufferSize(
         Integer.parseInt(
@@ -483,11 +473,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_leader_cache_memory_usage_percentage",
                 
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
-    config.setPipeMaxAlignedSeriesChunkSizeInOneBatch(
+    config.setPipeMaxReaderChunkSize(
         Long.parseLong(
             properties.getProperty(
                 "pipe_max_aligned_series_chunk_size_in_one_batch",
-                
String.valueOf(config.getPipeMaxAlignedSeriesChunkSizeInOneBatch()))));
+                String.valueOf(config.getPipeMaxReaderChunkSize()))));
 
     config.setPipeTransferTsFileSync(
         Boolean.parseBoolean(

Reply via email to