This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 7c01dfb9578 Pipe: Optimized the default behavior in meta sync for 
retries to wait for other regions (#16740) (#16746)
7c01dfb9578 is described below

commit 7c01dfb957821ec719bdba7553bb6129ff5db065
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 13 14:21:57 2025 +0800

    Pipe: Optimized the default behavior in meta sync for retries to wait for 
other regions (#16740) (#16746)
    
    * partial
    
    * cfg
    
    * core
    
    * fix
    
    * fix
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  4 +-
 .../visitor/PipeStatementTSStatusVisitor.java      | 37 +++++++++++--
 .../realtime/assigner/DisruptorQueue.java          |  4 +-
 .../schemaengine/schemaregion/utils/MetaUtils.java |  6 +--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 60 ++++++++++++++--------
 .../iotdb/commons/pipe/config/PipeConfig.java      | 31 ++++++-----
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 15 +++---
 .../pipe/receiver/PipeReceiverStatusHandler.java   | 18 ++++---
 9 files changed, 118 insertions(+), 58 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d28510c2b7f..b3939048bd6 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -263,6 +263,7 @@ public enum TSStatusCode {
   PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
   PIPE_PUSH_META_TIMEOUT(1813),
   PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814),
+  PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION(1815),
 
   // Subscription
   SUBSCRIPTION_VERSION_ERROR(1900),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 7070d57d3b3..cabf17b7f4f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -895,8 +895,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               .getLeft()) {
         return 0;
       }
-      return 
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize()
-          * 
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()
+      return 
PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferSize()
+          * 
PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes()
           * Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10);
     } catch (final IllegalPathException e) {
       return 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 6eae20e506f..94c3c418f21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -105,11 +105,22 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
     } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
-    } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
-        && 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
-            && config.isEnablePartialInsert())) {
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+    } else if (context.getCode() == 
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
+    } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+          && config.isEnablePartialInsert()) {
+        return new TSStatus(
+                
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+            .setMessage(context.getMessage());
+      }
+      if (context.getMessage().contains("does not exist")) {
+        return new TSStatus(
+                
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+            .setMessage(context.getMessage());
+      }
     }
     return visitStatement(insertBaseStatement, context);
   }
@@ -225,14 +236,24 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   @Override
   public TSStatus visitBatchActivateTemplate(
       final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus context) {
+    boolean userConflict = false;
     if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
       for (final TSStatus status : context.getSubStatus()) {
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
           return visitStatement(batchActivateTemplateStatement, context);
         }
+        if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+            && context.isSetMessage()
+            && context.getMessage().contains("has not been set any template")) 
{
+          userConflict = true;
+        }
       }
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+      return (userConflict
+              ? new TSStatus(
+                  
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+              : new TSStatus(
+                  
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
           .setMessage(context.getMessage());
     }
     return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
@@ -244,6 +265,12 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     }
+    if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && context.isSetMessage()
+        && context.getMessage().contains("has not been set any template")) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
     return visitStatement(activateTemplateStatement, context);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 4c3daa4879a..b2b4ea9b83d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -52,9 +52,9 @@ public class DisruptorQueue {
       final EventHandler<PipeRealtimeEvent> eventHandler,
       final Consumer<PipeRealtimeEvent> onAssignedHook) {
     final PipeConfig config = PipeConfig.getInstance();
-    final int ringBufferSize = 
config.getPipeExtractorAssignerDisruptorRingBufferSize();
+    final int ringBufferSize = 
config.getPipeSourceAssignerDisruptorRingBufferSize();
     final long ringBufferEntrySizeInBytes =
-        config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
+        config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
 
     allocatedMemoryBlock =
         PipeDataNodeResourceManager.memory()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
index 108b5f14d9d..d429931894f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
@@ -69,9 +69,9 @@ public class MetaUtils {
       throw new IllegalPathException(
           path.getFullPath(), "it does not start with " + 
IoTDBConstant.PATH_ROOT);
     }
-    String[] storageGroupNodes = new String[level + 1];
-    System.arraycopy(nodeNames, 0, storageGroupNodes, 0, level + 1);
-    return new PartialPath(storageGroupNodes);
+    String[] databaseNodes = new String[level + 1];
+    System.arraycopy(nodeNames, 0, databaseNodes, 0, level + 1);
+    return new PartialPath(databaseNodes);
   }
 
   /**
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 88e6985070f..091fb56def2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -211,6 +211,8 @@ public class CommonConfig {
   private int pipeSubtaskExecutorMaxThreadNum =
       Math.max(5, Runtime.getRuntime().availableProcessors() / 2);
 
+  private boolean pipeRetryLocallyForParallelOrUserConflict = true;
+
   private int pipeDataStructureTabletRowSize = 2048;
   private int pipeDataStructureTabletSizeInBytes = 2097152;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.3;
@@ -253,8 +255,8 @@ public class CommonConfig {
 
   private long pipeMaxWaitFinishTime = 10 * 1000;
 
-  private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
-  private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * 
KB;
+  private int pipeSourceAssignerDisruptorRingBufferSize = 128;
+  private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
   private long pipeSourceMatcherCacheSize = 1024;
 
   private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
@@ -941,38 +943,37 @@ public class CommonConfig {
         "pipeTotalFloatingMemoryProportion is set to {}.", 
pipeTotalFloatingMemoryProportion);
   }
 
-  public int getPipeExtractorAssignerDisruptorRingBufferSize() {
-    return pipeExtractorAssignerDisruptorRingBufferSize;
+  public int getPipeSourceAssignerDisruptorRingBufferSize() {
+    return pipeSourceAssignerDisruptorRingBufferSize;
   }
 
-  public void setPipeExtractorAssignerDisruptorRingBufferSize(
-      int pipeExtractorAssignerDisruptorRingBufferSize) {
-    if (this.pipeExtractorAssignerDisruptorRingBufferSize
-        == pipeExtractorAssignerDisruptorRingBufferSize) {
+  public void setPipeSourceAssignerDisruptorRingBufferSize(
+      int pipeSourceAssignerDisruptorRingBufferSize) {
+    if (this.pipeSourceAssignerDisruptorRingBufferSize
+        == pipeSourceAssignerDisruptorRingBufferSize) {
       return;
     }
-    this.pipeExtractorAssignerDisruptorRingBufferSize =
-        pipeExtractorAssignerDisruptorRingBufferSize;
+    this.pipeSourceAssignerDisruptorRingBufferSize = 
pipeSourceAssignerDisruptorRingBufferSize;
     logger.info(
-        "pipeExtractorAssignerDisruptorRingBufferSize is set to {}.",
-        pipeExtractorAssignerDisruptorRingBufferSize);
+        "pipeSourceAssignerDisruptorRingBufferSize is set to {}.",
+        pipeSourceAssignerDisruptorRingBufferSize);
   }
 
-  public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() {
-    return pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes;
+  public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() {
+    return pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes;
   }
 
-  public void setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes(
-      long pipeExtractorAssignerDisruptorRingBufferEntrySize) {
-    if (pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes
-        == pipeExtractorAssignerDisruptorRingBufferEntrySize) {
+  public void setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes(
+      long pipeSourceAssignerDisruptorRingBufferEntrySize) {
+    if (pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes
+        == pipeSourceAssignerDisruptorRingBufferEntrySize) {
       return;
     }
-    this.pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes =
-        pipeExtractorAssignerDisruptorRingBufferEntrySize;
+    this.pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes =
+        pipeSourceAssignerDisruptorRingBufferEntrySize;
     logger.info(
-        "pipeExtractorAssignerDisruptorRingBufferEntrySize is set to {}.",
-        pipeExtractorAssignerDisruptorRingBufferEntrySize);
+        "pipeSourceAssignerDisruptorRingBufferEntrySize is set to {}.",
+        pipeSourceAssignerDisruptorRingBufferEntrySize);
   }
 
   public long getPipeSourceMatcherCacheSize() {
@@ -1345,6 +1346,21 @@ public class CommonConfig {
     logger.info("pipeSubtaskExecutorMaxThreadNum is set to {}.", 
pipeSubtaskExecutorMaxThreadNum);
   }
 
+  public boolean isPipeRetryLocallyForParallelOrUserConflict() {
+    return pipeRetryLocallyForParallelOrUserConflict;
+  }
+
+  public void setPipeRetryLocallyForParallelOrUserConflict(
+      boolean pipeRetryLocallyForParallelOrUserConflict) {
+    if (this.pipeRetryLocallyForParallelOrUserConflict
+        == pipeRetryLocallyForParallelOrUserConflict) {
+      return;
+    }
+    this.pipeRetryLocallyForParallelOrUserConflict = 
pipeRetryLocallyForParallelOrUserConflict;
+    logger.info(
+        "pipeRetryLocallyForParallelOrUserConflict is set to {}.", 
pipeSubtaskExecutorMaxThreadNum);
+  }
+
   public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
     return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 2578008f68e..2cd745a5e67 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -143,14 +143,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxWaitFinishTime();
   }
 
-  /////////////////////////////// Extractor ///////////////////////////////
+  /////////////////////////////// Source ///////////////////////////////
 
-  public int getPipeExtractorAssignerDisruptorRingBufferSize() {
-    return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize();
+  public int getPipeSourceAssignerDisruptorRingBufferSize() {
+    return COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferSize();
   }
 
-  public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() {
-    return 
COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
+  public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() {
+    return 
COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
   }
 
   public long getPipeSourceMatcherCacheSize() {
@@ -337,8 +337,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipePeriodicalLogMinIntervalSeconds();
   }
 
-  public long getPipeLoggerCacheMaxSizeInBytes() {
-    return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
+  public boolean isPipeRetryLocallyForParallelOrUserConflict() {
+    return COMMON_CONFIG.isPipeRetryLocallyForParallelOrUserConflict();
   }
 
   /////////////////////////////// Logger ///////////////////////////////
@@ -359,6 +359,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
   }
 
+  public long getPipeLoggerCacheMaxSizeInBytes() {
+    return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
+  }
+
   /////////////////////////////// Memory ///////////////////////////////
 
   public boolean getPipeMemoryManagementEnabled() {
@@ -482,11 +486,11 @@ public class PipeConfig {
     LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
 
     LOGGER.info(
-        "PipeExtractorAssignerDisruptorRingBufferSize: {}",
-        getPipeExtractorAssignerDisruptorRingBufferSize());
+        "PipeSourceAssignerDisruptorRingBufferSize: {}",
+        getPipeSourceAssignerDisruptorRingBufferSize());
     LOGGER.info(
-        "PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}",
-        getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
+        "PipeSourceAssignerDisruptorRingBufferEntrySizeInBytes: {}",
+        getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
     LOGGER.info("PipeSourceMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
 
     LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeConnectorHandshakeTimeoutMs());
@@ -593,12 +597,15 @@ public class PipeConfig {
     LOGGER.info("PipeReceiverLoadConversionEnabled: {}", 
isPipeReceiverLoadConversionEnabled());
     LOGGER.info(
         "PipePeriodicalLogMinIntervalSeconds: {}", 
getPipePeriodicalLogMinIntervalSeconds());
-    LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", 
getPipeLoggerCacheMaxSizeInBytes());
+    LOGGER.info(
+        "PipeRetryLocallyForParallelOrUserConflict: {}",
+        isPipeRetryLocallyForParallelOrUserConflict());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
     LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", 
getPipeTsFilePinMaxLogNumPerRound());
     LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", 
getPipeTsFilePinMaxLogIntervalRounds());
+    LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", 
getPipeLoggerCacheMaxSizeInBytes());
 
     LOGGER.info("PipeMemoryManagementEnabled: {}", 
getPipeMemoryManagementEnabled());
     LOGGER.info("PipeMemoryAllocateMaxRetries: {}", 
getPipeMemoryAllocateMaxRetries());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 27e5fddf14d..a0b29a5f716 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -290,17 +290,21 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
                 
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
+    config.setPipeRetryLocallyForParallelOrUserConflict(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_retry_locally_for_user_conflict",
+                
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
 
-    config.setPipeExtractorAssignerDisruptorRingBufferSize(
+    config.setPipeSourceAssignerDisruptorRingBufferSize(
         Integer.parseInt(
             Optional.ofNullable(
                     
properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size"))
                 .orElse(
                     properties.getProperty(
                         "pipe_extractor_assigner_disruptor_ring_buffer_size",
-                        String.valueOf(
-                            
config.getPipeExtractorAssignerDisruptorRingBufferSize())))));
-    config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
+                        
String.valueOf(config.getPipeSourceAssignerDisruptorRingBufferSize())))));
+    config.setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
         Integer.parseInt(
             Optional.ofNullable(
                     properties.getProperty(
@@ -309,8 +313,7 @@ public class PipeDescriptor {
                     properties.getProperty(
                         
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
                         String.valueOf(
-                            config
-                                
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
+                            
config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes())))));
 
     config.setPipeSourceMatcherCacheSize(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 8d913f8b2a5..48939fc3ba3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -117,6 +118,7 @@ public class PipeReceiverStatusHandler {
         }
 
       case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
+      case 1815: // PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION
         if (!isRetryAllowedWhenConflictOccurs) {
           LOGGER.warn(
               "User conflict exception: will be ignored because retry is not 
allowed. event: {}. status: {}",
@@ -151,12 +153,16 @@ public class PipeReceiverStatusHandler {
                       + " seconds",
               status);
           exceptionEventHasBeenRetried.set(true);
-          throw new PipeRuntimeSinkRetryTimesConfigurableException(
-              exceptionMessage,
-              (int)
-                  Math.max(
-                      PipeSubtask.MAX_RETRY_TIMES,
-                      Math.min(CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenConflictOccurs * 1.1)));
+          throw status.getCode() == 1815
+                  && 
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
+              ? new PipeNonReportException(exceptionMessage)
+              : new PipeRuntimeSinkRetryTimesConfigurableException(
+                  exceptionMessage,
+                  (int)
+                      Math.max(
+                          PipeSubtask.MAX_RETRY_TIMES,
+                          Math.min(
+                              CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenConflictOccurs * 1.1)));
         }
 
       case 803: // NO_PERMISSION

Reply via email to