This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 92878f4ad9387339ed2c510646cae2da05e6dc7f
Author: Caideyipi <[email protected]>
AuthorDate: Wed Nov 12 21:46:37 2025 +0800

    Pipe: Optimized the default behavior in meta sync for retries to wait for 
other regions (#16740)
    
    * partial
    
    * cfg
    
    * core
    
    * fix
    
    * fix
    
    (cherry picked from commit 6e8748e02561df3bc90aaab650ea245e95e4645a)
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  4 +-
 .../visitor/PipeStatementTSStatusVisitor.java      | 37 +++++++++++--
 .../realtime/assigner/DisruptorQueue.java          |  4 +-
 .../schemaengine/schemaregion/utils/MetaUtils.java |  6 +--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 60 ++++++++++++++--------
 .../iotdb/commons/pipe/config/PipeConfig.java      | 33 +++++++-----
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 15 +++---
 .../pipe/receiver/PipeReceiverStatusHandler.java   | 18 ++++---
 9 files changed, 119 insertions(+), 59 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 84534adfadb..4fd90ca4b94 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -295,6 +295,7 @@ public enum TSStatusCode {
   PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
   PIPE_PUSH_META_TIMEOUT(1813),
   PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814),
+  PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION(1815),
 
   // Subscription
   SUBSCRIPTION_VERSION_ERROR(1900),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 87e744985f8..438dfa3f233 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -892,8 +892,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               .getLeft()) {
         return 0;
       }
-      return 
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize()
-          * 
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()
+      return 
PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferSize()
+          * 
PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes()
           * Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10);
     } catch (final IllegalPathException e) {
       return 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 882485a6c72..31eecc7b3a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -106,11 +106,22 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
     } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
-    } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
-        && 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
-            && config.isEnablePartialInsert())) {
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+    } else if (context.getCode() == 
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
+    } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+          && config.isEnablePartialInsert()) {
+        return new TSStatus(
+                
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+            .setMessage(context.getMessage());
+      }
+      if (context.getMessage().contains("does not exist")) {
+        return new TSStatus(
+                
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+            .setMessage(context.getMessage());
+      }
     }
     return visitStatement(insertBaseStatement, context);
   }
@@ -226,14 +237,24 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   @Override
   public TSStatus visitBatchActivateTemplate(
       final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus context) {
+    boolean userConflict = false;
     if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
       for (final TSStatus status : context.getSubStatus()) {
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
           return visitStatement(batchActivateTemplateStatement, context);
         }
+        if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+            && context.isSetMessage()
+            && context.getMessage().contains("has not been set any template")) 
{
+          userConflict = true;
+        }
       }
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+      return (userConflict
+              ? new TSStatus(
+                  
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+              : new TSStatus(
+                  
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
           .setMessage(context.getMessage());
     }
     return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
@@ -245,6 +266,12 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     }
+    if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && context.isSetMessage()
+        && context.getMessage().contains("has not been set any template")) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
     return visitStatement(activateTemplateStatement, context);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index cb4aba1e15a..fc30f14be5b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -49,9 +49,9 @@ public class DisruptorQueue {
       final EventHandler<PipeRealtimeEvent> eventHandler,
       final Consumer<PipeRealtimeEvent> onAssignedHook) {
     final PipeConfig config = PipeConfig.getInstance();
-    final int ringBufferSize = 
config.getPipeExtractorAssignerDisruptorRingBufferSize();
+    final int ringBufferSize = 
config.getPipeSourceAssignerDisruptorRingBufferSize();
     final long ringBufferEntrySizeInBytes =
-        config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
+        config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
 
     allocatedMemoryBlock =
         PipeDataNodeResourceManager.memory()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
index fae3a21fd68..11b41b238a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
@@ -70,9 +70,9 @@ public class MetaUtils {
       throw new IllegalPathException(
           path.getFullPath(), "it does not start with " + 
IoTDBConstant.PATH_ROOT);
     }
-    String[] storageGroupNodes = new String[level + 1];
-    System.arraycopy(nodeNames, 0, storageGroupNodes, 0, level + 1);
-    return new PartialPath(storageGroupNodes);
+    String[] databaseNodes = new String[level + 1];
+    System.arraycopy(nodeNames, 0, databaseNodes, 0, level + 1);
+    return new PartialPath(databaseNodes);
   }
 
   /**
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 43a637d5c2c..9102a117eca 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -214,6 +214,8 @@ public class CommonConfig {
   private int pipeSubtaskExecutorMaxThreadNum =
       Math.max(5, Runtime.getRuntime().availableProcessors() / 2);
 
+  private boolean pipeRetryLocallyForParallelOrUserConflict = true;
+
   private int pipeDataStructureTabletRowSize = 2048;
   private int pipeDataStructureTabletSizeInBytes = 2097152;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.3;
@@ -256,8 +258,8 @@ public class CommonConfig {
 
   private long pipeMaxWaitFinishTime = 10 * 1000;
 
-  private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
-  private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * 
KB;
+  private int pipeSourceAssignerDisruptorRingBufferSize = 128;
+  private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
   private long pipeSourceMatcherCacheSize = 1024;
 
   private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
@@ -996,38 +998,37 @@ public class CommonConfig {
         "pipeTotalFloatingMemoryProportion is set to {}.", 
pipeTotalFloatingMemoryProportion);
   }
 
-  public int getPipeExtractorAssignerDisruptorRingBufferSize() {
-    return pipeExtractorAssignerDisruptorRingBufferSize;
+  public int getPipeSourceAssignerDisruptorRingBufferSize() {
+    return pipeSourceAssignerDisruptorRingBufferSize;
   }
 
-  public void setPipeExtractorAssignerDisruptorRingBufferSize(
-      int pipeExtractorAssignerDisruptorRingBufferSize) {
-    if (this.pipeExtractorAssignerDisruptorRingBufferSize
-        == pipeExtractorAssignerDisruptorRingBufferSize) {
+  public void setPipeSourceAssignerDisruptorRingBufferSize(
+      int pipeSourceAssignerDisruptorRingBufferSize) {
+    if (this.pipeSourceAssignerDisruptorRingBufferSize
+        == pipeSourceAssignerDisruptorRingBufferSize) {
       return;
     }
-    this.pipeExtractorAssignerDisruptorRingBufferSize =
-        pipeExtractorAssignerDisruptorRingBufferSize;
+    this.pipeSourceAssignerDisruptorRingBufferSize = 
pipeSourceAssignerDisruptorRingBufferSize;
     logger.info(
-        "pipeExtractorAssignerDisruptorRingBufferSize is set to {}.",
-        pipeExtractorAssignerDisruptorRingBufferSize);
+        "pipeSourceAssignerDisruptorRingBufferSize is set to {}.",
+        pipeSourceAssignerDisruptorRingBufferSize);
   }
 
-  public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() {
-    return pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes;
+  public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() {
+    return pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes;
   }
 
-  public void setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes(
-      long pipeExtractorAssignerDisruptorRingBufferEntrySize) {
-    if (pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes
-        == pipeExtractorAssignerDisruptorRingBufferEntrySize) {
+  public void setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes(
+      long pipeSourceAssignerDisruptorRingBufferEntrySize) {
+    if (pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes
+        == pipeSourceAssignerDisruptorRingBufferEntrySize) {
       return;
     }
-    this.pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes =
-        pipeExtractorAssignerDisruptorRingBufferEntrySize;
+    this.pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes =
+        pipeSourceAssignerDisruptorRingBufferEntrySize;
     logger.info(
-        "pipeExtractorAssignerDisruptorRingBufferEntrySize is set to {}.",
-        pipeExtractorAssignerDisruptorRingBufferEntrySize);
+        "pipeSourceAssignerDisruptorRingBufferEntrySize is set to {}.",
+        pipeSourceAssignerDisruptorRingBufferEntrySize);
   }
 
   public long getPipeSourceMatcherCacheSize() {
@@ -1400,6 +1401,21 @@ public class CommonConfig {
     logger.info("pipeSubtaskExecutorMaxThreadNum is set to {}.", 
pipeSubtaskExecutorMaxThreadNum);
   }
 
+  public boolean isPipeRetryLocallyForParallelOrUserConflict() {
+    return pipeRetryLocallyForParallelOrUserConflict;
+  }
+
+  public void setPipeRetryLocallyForParallelOrUserConflict(
+      boolean pipeRetryLocallyForParallelOrUserConflict) {
+    if (this.pipeRetryLocallyForParallelOrUserConflict
+        == pipeRetryLocallyForParallelOrUserConflict) {
+      return;
+    }
+    this.pipeRetryLocallyForParallelOrUserConflict = 
pipeRetryLocallyForParallelOrUserConflict;
+    logger.info(
+        "pipeRetryLocallyForParallelOrUserConflict is set to {}.", 
pipeSubtaskExecutorMaxThreadNum);
+  }
+
   public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
     return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4e840d51c55..637628149b6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -143,14 +143,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxWaitFinishTime();
   }
 
-  /////////////////////////////// Extractor ///////////////////////////////
+  /////////////////////////////// Source ///////////////////////////////
 
-  public int getPipeExtractorAssignerDisruptorRingBufferSize() {
-    return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize();
+  public int getPipeSourceAssignerDisruptorRingBufferSize() {
+    return COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferSize();
   }
 
-  public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() {
-    return 
COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
+  public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() {
+    return 
COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
   }
 
   public long getPipeSourceMatcherCacheSize() {
@@ -337,8 +337,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipePeriodicalLogMinIntervalSeconds();
   }
 
-  public long getPipeLoggerCacheMaxSizeInBytes() {
-    return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
+  public boolean isPipeRetryLocallyForParallelOrUserConflict() {
+    return COMMON_CONFIG.isPipeRetryLocallyForParallelOrUserConflict();
   }
 
   /////////////////////////////// Logger ///////////////////////////////
@@ -359,6 +359,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
   }
 
+  public long getPipeLoggerCacheMaxSizeInBytes() {
+    return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
+  }
+
   /////////////////////////////// Memory ///////////////////////////////
 
   public boolean getPipeMemoryManagementEnabled() {
@@ -482,12 +486,12 @@ public class PipeConfig {
     LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
 
     LOGGER.info(
-        "PipeExtractorAssignerDisruptorRingBufferSize: {}",
-        getPipeExtractorAssignerDisruptorRingBufferSize());
+        "PipeSourceAssignerDisruptorRingBufferSize: {}",
+        getPipeSourceAssignerDisruptorRingBufferSize());
     LOGGER.info(
-        "PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}",
-        getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
-    LOGGER.info("PipeExtractorMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
+        "PipeSourceAssignerDisruptorRingBufferEntrySizeInBytes: {}",
+        getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
+    LOGGER.info("PipeSourceMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
 
     LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeConnectorHandshakeTimeoutMs());
     LOGGER.info("PipeConnectorTransferTimeoutMs: {}", 
getPipeConnectorTransferTimeoutMs());
@@ -592,12 +596,15 @@ public class PipeConfig {
     LOGGER.info("PipeReceiverLoadConversionEnabled: {}", 
isPipeReceiverLoadConversionEnabled());
     LOGGER.info(
         "PipePeriodicalLogMinIntervalSeconds: {}", 
getPipePeriodicalLogMinIntervalSeconds());
-    LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", 
getPipeLoggerCacheMaxSizeInBytes());
+    LOGGER.info(
+        "PipeRetryLocallyForParallelOrUserConflict: {}",
+        isPipeRetryLocallyForParallelOrUserConflict());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
     LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", 
getPipeTsFilePinMaxLogNumPerRound());
     LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", 
getPipeTsFilePinMaxLogIntervalRounds());
+    LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", 
getPipeLoggerCacheMaxSizeInBytes());
 
     LOGGER.info("PipeMemoryManagementEnabled: {}", 
getPipeMemoryManagementEnabled());
     LOGGER.info("PipeMemoryAllocateMaxRetries: {}", 
getPipeMemoryAllocateMaxRetries());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 760a7b5e51f..77aae8a3252 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -290,17 +290,21 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
                 
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
+    config.setPipeRetryLocallyForParallelOrUserConflict(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_retry_locally_for_user_conflict",
+                
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
 
-    config.setPipeExtractorAssignerDisruptorRingBufferSize(
+    config.setPipeSourceAssignerDisruptorRingBufferSize(
         Integer.parseInt(
             Optional.ofNullable(
                     
properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size"))
                 .orElse(
                     properties.getProperty(
                         "pipe_extractor_assigner_disruptor_ring_buffer_size",
-                        String.valueOf(
-                            
config.getPipeExtractorAssignerDisruptorRingBufferSize())))));
-    config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
+                        
String.valueOf(config.getPipeSourceAssignerDisruptorRingBufferSize())))));
+    config.setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
         Integer.parseInt(
             Optional.ofNullable(
                     properties.getProperty(
@@ -309,8 +313,7 @@ public class PipeDescriptor {
                     properties.getProperty(
                         
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
                         String.valueOf(
-                            config
-                                
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
+                            
config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes())))));
 
     config.setPipeSourceMatcherCacheSize(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 3bb9e14d181..350746d7b8e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingI
 import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -131,6 +132,7 @@ public class PipeReceiverStatusHandler {
         }
 
       case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
+      case 1815: // PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION
         if (!isRetryAllowedWhenConflictOccurs) {
           LOGGER.warn(
               "User conflict exception: will be ignored because retry is not 
allowed. event: {}. status: {}",
@@ -165,12 +167,16 @@ public class PipeReceiverStatusHandler {
                       + " seconds",
               status);
           exceptionEventHasBeenRetried.set(true);
-          throw new PipeRuntimeSinkRetryTimesConfigurableException(
-              exceptionMessage,
-              (int)
-                  Math.max(
-                      PipeSubtask.MAX_RETRY_TIMES,
-                      Math.min(CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenConflictOccurs * 1.1)));
+          throw status.getCode() == 1815
+                  && 
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
+              ? new PipeNonReportException(exceptionMessage)
+              : new PipeRuntimeSinkRetryTimesConfigurableException(
+                  exceptionMessage,
+                  (int)
+                      Math.max(
+                          PipeSubtask.MAX_RETRY_TIMES,
+                          Math.min(
+                              CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenConflictOccurs * 1.1)));
         }
 
       case 803: // NO_PERMISSION

Reply via email to