This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 7c01dfb9578 Pipe: Optimized the default behavior in meta sync for
retries to wait for other regions (#16740) (#16746)
7c01dfb9578 is described below
commit 7c01dfb957821ec719bdba7553bb6129ff5db065
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 13 14:21:57 2025 +0800
Pipe: Optimized the default behavior in meta sync for retries to wait for
other regions (#16740) (#16746)
* partial
* cfg
* core
* fix
* fix
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 4 +-
.../visitor/PipeStatementTSStatusVisitor.java | 37 +++++++++++--
.../realtime/assigner/DisruptorQueue.java | 4 +-
.../schemaengine/schemaregion/utils/MetaUtils.java | 6 +--
.../apache/iotdb/commons/conf/CommonConfig.java | 60 ++++++++++++++--------
.../iotdb/commons/pipe/config/PipeConfig.java | 31 ++++++-----
.../iotdb/commons/pipe/config/PipeDescriptor.java | 15 +++---
.../pipe/receiver/PipeReceiverStatusHandler.java | 18 ++++---
9 files changed, 118 insertions(+), 58 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d28510c2b7f..b3939048bd6 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -263,6 +263,7 @@ public enum TSStatusCode {
PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
PIPE_PUSH_META_TIMEOUT(1813),
PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814),
+ PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION(1815),
// Subscription
SUBSCRIPTION_VERSION_ERROR(1900),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 7070d57d3b3..cabf17b7f4f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -895,8 +895,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
.getLeft()) {
return 0;
}
- return
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize()
- *
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()
+ return
PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferSize()
+ *
PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes()
* Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10);
} catch (final IllegalPathException e) {
return 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 6eae20e506f..94c3c418f21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -105,11 +105,22 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
} else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
- } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
- &&
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
- && config.isEnablePartialInsert())) {
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ } else if (context.getCode() ==
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
+ } else if (context.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+ if
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+ && config.isEnablePartialInsert()) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+ if (context.getMessage().contains("does not exist")) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
}
return visitStatement(insertBaseStatement, context);
}
@@ -225,14 +236,24 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
@Override
public TSStatus visitBatchActivateTemplate(
final BatchActivateTemplateStatement batchActivateTemplateStatement,
final TSStatus context) {
+ boolean userConflict = false;
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (final TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
return visitStatement(batchActivateTemplateStatement, context);
}
+ if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ && context.isSetMessage()
+ && context.getMessage().contains("has not been set any template"))
{
+ userConflict = true;
+ }
}
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ return (userConflict
+ ? new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+ : new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
.setMessage(context.getMessage());
}
return visitGeneralActivateTemplate(batchActivateTemplateStatement,
context);
@@ -244,6 +265,12 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
+ if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ && context.isSetMessage()
+ && context.getMessage().contains("has not been set any template")) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
return visitStatement(activateTemplateStatement, context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 4c3daa4879a..b2b4ea9b83d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -52,9 +52,9 @@ public class DisruptorQueue {
final EventHandler<PipeRealtimeEvent> eventHandler,
final Consumer<PipeRealtimeEvent> onAssignedHook) {
final PipeConfig config = PipeConfig.getInstance();
- final int ringBufferSize =
config.getPipeExtractorAssignerDisruptorRingBufferSize();
+ final int ringBufferSize =
config.getPipeSourceAssignerDisruptorRingBufferSize();
final long ringBufferEntrySizeInBytes =
- config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
+ config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
index 108b5f14d9d..d429931894f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
@@ -69,9 +69,9 @@ public class MetaUtils {
throw new IllegalPathException(
path.getFullPath(), "it does not start with " +
IoTDBConstant.PATH_ROOT);
}
- String[] storageGroupNodes = new String[level + 1];
- System.arraycopy(nodeNames, 0, storageGroupNodes, 0, level + 1);
- return new PartialPath(storageGroupNodes);
+ String[] databaseNodes = new String[level + 1];
+ System.arraycopy(nodeNames, 0, databaseNodes, 0, level + 1);
+ return new PartialPath(databaseNodes);
}
/**
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 88e6985070f..091fb56def2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -211,6 +211,8 @@ public class CommonConfig {
private int pipeSubtaskExecutorMaxThreadNum =
Math.max(5, Runtime.getRuntime().availableProcessors() / 2);
+ private boolean pipeRetryLocallyForParallelOrUserConflict = true;
+
private int pipeDataStructureTabletRowSize = 2048;
private int pipeDataStructureTabletSizeInBytes = 2097152;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.3;
@@ -253,8 +255,8 @@ public class CommonConfig {
private long pipeMaxWaitFinishTime = 10 * 1000;
- private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
- private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 *
KB;
+ private int pipeSourceAssignerDisruptorRingBufferSize = 128;
+ private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
private long pipeSourceMatcherCacheSize = 1024;
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
@@ -941,38 +943,37 @@ public class CommonConfig {
"pipeTotalFloatingMemoryProportion is set to {}.",
pipeTotalFloatingMemoryProportion);
}
- public int getPipeExtractorAssignerDisruptorRingBufferSize() {
- return pipeExtractorAssignerDisruptorRingBufferSize;
+ public int getPipeSourceAssignerDisruptorRingBufferSize() {
+ return pipeSourceAssignerDisruptorRingBufferSize;
}
- public void setPipeExtractorAssignerDisruptorRingBufferSize(
- int pipeExtractorAssignerDisruptorRingBufferSize) {
- if (this.pipeExtractorAssignerDisruptorRingBufferSize
- == pipeExtractorAssignerDisruptorRingBufferSize) {
+ public void setPipeSourceAssignerDisruptorRingBufferSize(
+ int pipeSourceAssignerDisruptorRingBufferSize) {
+ if (this.pipeSourceAssignerDisruptorRingBufferSize
+ == pipeSourceAssignerDisruptorRingBufferSize) {
return;
}
- this.pipeExtractorAssignerDisruptorRingBufferSize =
- pipeExtractorAssignerDisruptorRingBufferSize;
+ this.pipeSourceAssignerDisruptorRingBufferSize =
pipeSourceAssignerDisruptorRingBufferSize;
logger.info(
- "pipeExtractorAssignerDisruptorRingBufferSize is set to {}.",
- pipeExtractorAssignerDisruptorRingBufferSize);
+ "pipeSourceAssignerDisruptorRingBufferSize is set to {}.",
+ pipeSourceAssignerDisruptorRingBufferSize);
}
- public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() {
- return pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes;
+ public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() {
+ return pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes;
}
- public void setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes(
- long pipeExtractorAssignerDisruptorRingBufferEntrySize) {
- if (pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes
- == pipeExtractorAssignerDisruptorRingBufferEntrySize) {
+ public void setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes(
+ long pipeSourceAssignerDisruptorRingBufferEntrySize) {
+ if (pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes
+ == pipeSourceAssignerDisruptorRingBufferEntrySize) {
return;
}
- this.pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes =
- pipeExtractorAssignerDisruptorRingBufferEntrySize;
+ this.pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes =
+ pipeSourceAssignerDisruptorRingBufferEntrySize;
logger.info(
- "pipeExtractorAssignerDisruptorRingBufferEntrySize is set to {}.",
- pipeExtractorAssignerDisruptorRingBufferEntrySize);
+ "pipeSourceAssignerDisruptorRingBufferEntrySize is set to {}.",
+ pipeSourceAssignerDisruptorRingBufferEntrySize);
}
public long getPipeSourceMatcherCacheSize() {
@@ -1345,6 +1346,21 @@ public class CommonConfig {
logger.info("pipeSubtaskExecutorMaxThreadNum is set to {}.",
pipeSubtaskExecutorMaxThreadNum);
}
+ public boolean isPipeRetryLocallyForParallelOrUserConflict() {
+ return pipeRetryLocallyForParallelOrUserConflict;
+ }
+
+ public void setPipeRetryLocallyForParallelOrUserConflict(
+ boolean pipeRetryLocallyForParallelOrUserConflict) {
+ if (this.pipeRetryLocallyForParallelOrUserConflict
+ == pipeRetryLocallyForParallelOrUserConflict) {
+ return;
+ }
+ this.pipeRetryLocallyForParallelOrUserConflict =
pipeRetryLocallyForParallelOrUserConflict;
+ logger.info(
+ "pipeRetryLocallyForParallelOrUserConflict is set to {}.",
pipeSubtaskExecutorMaxThreadNum);
+ }
+
public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 2578008f68e..2cd745a5e67 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -143,14 +143,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMaxWaitFinishTime();
}
- /////////////////////////////// Extractor ///////////////////////////////
+ /////////////////////////////// Source ///////////////////////////////
- public int getPipeExtractorAssignerDisruptorRingBufferSize() {
- return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize();
+ public int getPipeSourceAssignerDisruptorRingBufferSize() {
+ return COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferSize();
}
- public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() {
- return
COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
+ public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() {
+ return
COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
}
public long getPipeSourceMatcherCacheSize() {
@@ -337,8 +337,8 @@ public class PipeConfig {
return COMMON_CONFIG.getPipePeriodicalLogMinIntervalSeconds();
}
- public long getPipeLoggerCacheMaxSizeInBytes() {
- return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
+ public boolean isPipeRetryLocallyForParallelOrUserConflict() {
+ return COMMON_CONFIG.isPipeRetryLocallyForParallelOrUserConflict();
}
/////////////////////////////// Logger ///////////////////////////////
@@ -359,6 +359,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
}
+ public long getPipeLoggerCacheMaxSizeInBytes() {
+ return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
+ }
+
/////////////////////////////// Memory ///////////////////////////////
public boolean getPipeMemoryManagementEnabled() {
@@ -482,11 +486,11 @@ public class PipeConfig {
LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
LOGGER.info(
- "PipeExtractorAssignerDisruptorRingBufferSize: {}",
- getPipeExtractorAssignerDisruptorRingBufferSize());
+ "PipeSourceAssignerDisruptorRingBufferSize: {}",
+ getPipeSourceAssignerDisruptorRingBufferSize());
LOGGER.info(
- "PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}",
- getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
+ "PipeSourceAssignerDisruptorRingBufferEntrySizeInBytes: {}",
+ getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
LOGGER.info("PipeSourceMatcherCacheSize: {}",
getPipeSourceMatcherCacheSize());
LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeConnectorHandshakeTimeoutMs());
@@ -593,12 +597,15 @@ public class PipeConfig {
LOGGER.info("PipeReceiverLoadConversionEnabled: {}",
isPipeReceiverLoadConversionEnabled());
LOGGER.info(
"PipePeriodicalLogMinIntervalSeconds: {}",
getPipePeriodicalLogMinIntervalSeconds());
- LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}",
getPipeLoggerCacheMaxSizeInBytes());
+ LOGGER.info(
+ "PipeRetryLocallyForParallelOrUserConflict: {}",
+ isPipeRetryLocallyForParallelOrUserConflict());
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());
LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}",
getPipeTsFilePinMaxLogNumPerRound());
LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}",
getPipeTsFilePinMaxLogIntervalRounds());
+ LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}",
getPipeLoggerCacheMaxSizeInBytes());
LOGGER.info("PipeMemoryManagementEnabled: {}",
getPipeMemoryManagementEnabled());
LOGGER.info("PipeMemoryAllocateMaxRetries: {}",
getPipeMemoryAllocateMaxRetries());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 27e5fddf14d..a0b29a5f716 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -290,17 +290,21 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_subtask_executor_pending_queue_max_blocking_time_ms",
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
+ config.setPipeRetryLocallyForParallelOrUserConflict(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_retry_locally_for_user_conflict",
+
String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict()))));
- config.setPipeExtractorAssignerDisruptorRingBufferSize(
+ config.setPipeSourceAssignerDisruptorRingBufferSize(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size"))
.orElse(
properties.getProperty(
"pipe_extractor_assigner_disruptor_ring_buffer_size",
- String.valueOf(
-
config.getPipeExtractorAssignerDisruptorRingBufferSize())))));
- config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
+
String.valueOf(config.getPipeSourceAssignerDisruptorRingBufferSize())))));
+ config.setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
Integer.parseInt(
Optional.ofNullable(
properties.getProperty(
@@ -309,8 +313,7 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
String.valueOf(
- config
-
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
+
config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes())))));
config.setPipeSourceMatcherCacheSize(
Integer.parseInt(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 8d913f8b2a5..48939fc3ba3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -117,6 +118,7 @@ public class PipeReceiverStatusHandler {
}
case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
+ case 1815: // PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION
if (!isRetryAllowedWhenConflictOccurs) {
LOGGER.warn(
"User conflict exception: will be ignored because retry is not
allowed. event: {}. status: {}",
@@ -151,12 +153,16 @@ public class PipeReceiverStatusHandler {
+ " seconds",
status);
exceptionEventHasBeenRetried.set(true);
- throw new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenConflictOccurs * 1.1)));
+ throw status.getCode() == 1815
+ &&
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
+ ? new PipeNonReportException(exceptionMessage)
+ : new PipeRuntimeSinkRetryTimesConfigurableException(
+ exceptionMessage,
+ (int)
+ Math.max(
+ PipeSubtask.MAX_RETRY_TIMES,
+ Math.min(
+ CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenConflictOccurs * 1.1)));
}
case 803: // NO_PERMISSION