This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 92878f4ad9387339ed2c510646cae2da05e6dc7f Author: Caideyipi <[email protected]> AuthorDate: Wed Nov 12 21:46:37 2025 +0800 Pipe: Optimized the default behavior in meta sync for retries to wait for other regions (#16740) * partial * cfg * core * fix * fix (cherry picked from commit 6e8748e02561df3bc90aaab650ea245e95e4645a) --- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 4 +- .../visitor/PipeStatementTSStatusVisitor.java | 37 +++++++++++-- .../realtime/assigner/DisruptorQueue.java | 4 +- .../schemaengine/schemaregion/utils/MetaUtils.java | 6 +-- .../apache/iotdb/commons/conf/CommonConfig.java | 60 ++++++++++++++-------- .../iotdb/commons/pipe/config/PipeConfig.java | 33 +++++++----- .../iotdb/commons/pipe/config/PipeDescriptor.java | 15 +++--- .../pipe/receiver/PipeReceiverStatusHandler.java | 18 ++++--- 9 files changed, 119 insertions(+), 59 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 84534adfadb..4fd90ca4b94 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -295,6 +295,7 @@ public enum TSStatusCode { PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812), PIPE_PUSH_META_TIMEOUT(1813), PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814), + PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION(1815), // Subscription SUBSCRIPTION_VERSION_ERROR(1900), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 87e744985f8..438dfa3f233 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -892,8 +892,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { .getLeft()) { return 0; } - return PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize() - * PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() + return PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferSize() + * PipeConfig.getInstance().getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() * Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10); } catch (final IllegalPathException e) { return 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index 882485a6c72..31eecc7b3a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -106,11 +106,22 @@ public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, TSS } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() - && (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) - && config.isEnablePartialInsert())) { - return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + } else if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(context.getMessage()); + } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + if (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) + && config.isEnablePartialInsert()) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } + if (context.getMessage().contains("does not exist")) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } } return visitStatement(insertBaseStatement, context); } @@ -226,14 +237,24 @@ public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, TSS @Override public TSStatus visitBatchActivateTemplate( final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus context) { + boolean userConflict = false; if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { for (final TSStatus status : context.getSubStatus()) { if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { return visitStatement(batchActivateTemplateStatement, context); } + if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && context.isSetMessage() + && context.getMessage().contains("has not been set any template")) { + userConflict = true; + } } - return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + return (userConflict + ? new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) + : new TSStatus( + TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) .setMessage(context.getMessage()); } return visitGeneralActivateTemplate(batchActivateTemplateStatement, context); @@ -245,6 +266,12 @@ public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, TSS return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(context.getMessage()); } + if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && context.isSetMessage() + && context.getMessage().contains("has not been set any template")) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } return visitStatement(activateTemplateStatement, context); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index cb4aba1e15a..fc30f14be5b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -49,9 +49,9 @@ public class DisruptorQueue { final EventHandler<PipeRealtimeEvent> eventHandler, final Consumer<PipeRealtimeEvent> onAssignedHook) { final PipeConfig config = PipeConfig.getInstance(); - final int ringBufferSize = config.getPipeExtractorAssignerDisruptorRingBufferSize(); + final int ringBufferSize = config.getPipeSourceAssignerDisruptorRingBufferSize(); final long ringBufferEntrySizeInBytes = - config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes(); + config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes(); allocatedMemoryBlock = PipeDataNodeResourceManager.memory() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java index fae3a21fd68..11b41b238a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java @@ -70,9 +70,9 @@ public class MetaUtils { throw new IllegalPathException( path.getFullPath(), "it does not start with " + IoTDBConstant.PATH_ROOT); } - String[] storageGroupNodes = new String[level + 1]; - System.arraycopy(nodeNames, 0, storageGroupNodes, 0, level + 1); - return new PartialPath(storageGroupNodes); + String[] databaseNodes = new String[level + 1]; + System.arraycopy(nodeNames, 0, databaseNodes, 0, level + 1); + return new PartialPath(databaseNodes); } /** diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 43a637d5c2c..9102a117eca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -214,6 +214,8 @@ public class CommonConfig { private int pipeSubtaskExecutorMaxThreadNum = Math.max(5, Runtime.getRuntime().availableProcessors() / 2); + private boolean pipeRetryLocallyForParallelOrUserConflict = true; + private int pipeDataStructureTabletRowSize = 2048; private int pipeDataStructureTabletSizeInBytes = 2097152; private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3; @@ -256,8 +258,8 @@ public class CommonConfig { private long pipeMaxWaitFinishTime = 10 * 1000; - private int pipeExtractorAssignerDisruptorRingBufferSize = 128; - private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB; + private int pipeSourceAssignerDisruptorRingBufferSize = 128; + private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB; private long pipeSourceMatcherCacheSize = 1024; private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds @@ -996,38 +998,37 @@ public class CommonConfig { "pipeTotalFloatingMemoryProportion is set to {}.", pipeTotalFloatingMemoryProportion); } - public int getPipeExtractorAssignerDisruptorRingBufferSize() { - return pipeExtractorAssignerDisruptorRingBufferSize; + public int getPipeSourceAssignerDisruptorRingBufferSize() { + return pipeSourceAssignerDisruptorRingBufferSize; } - public void setPipeExtractorAssignerDisruptorRingBufferSize( - int pipeExtractorAssignerDisruptorRingBufferSize) { - if (this.pipeExtractorAssignerDisruptorRingBufferSize - == pipeExtractorAssignerDisruptorRingBufferSize) { + public void setPipeSourceAssignerDisruptorRingBufferSize( + int pipeSourceAssignerDisruptorRingBufferSize) { + if (this.pipeSourceAssignerDisruptorRingBufferSize + == pipeSourceAssignerDisruptorRingBufferSize) { return; } - this.pipeExtractorAssignerDisruptorRingBufferSize = - pipeExtractorAssignerDisruptorRingBufferSize; + this.pipeSourceAssignerDisruptorRingBufferSize = pipeSourceAssignerDisruptorRingBufferSize; logger.info( - "pipeExtractorAssignerDisruptorRingBufferSize is set to {}.", - pipeExtractorAssignerDisruptorRingBufferSize); + "pipeSourceAssignerDisruptorRingBufferSize is set to {}.", + pipeSourceAssignerDisruptorRingBufferSize); } - public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() { - return pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes; + public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() { + return pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes; } - public void setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( - long pipeExtractorAssignerDisruptorRingBufferEntrySize) { - if (pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes - == pipeExtractorAssignerDisruptorRingBufferEntrySize) { + public void setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes( + long pipeSourceAssignerDisruptorRingBufferEntrySize) { + if (pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes + == pipeSourceAssignerDisruptorRingBufferEntrySize) { return; } - this.pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = - pipeExtractorAssignerDisruptorRingBufferEntrySize; + this.pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = + pipeSourceAssignerDisruptorRingBufferEntrySize; logger.info( - "pipeExtractorAssignerDisruptorRingBufferEntrySize is set to {}.", - pipeExtractorAssignerDisruptorRingBufferEntrySize); + "pipeSourceAssignerDisruptorRingBufferEntrySize is set to {}.", + pipeSourceAssignerDisruptorRingBufferEntrySize); } public long getPipeSourceMatcherCacheSize() { @@ -1400,6 +1401,21 @@ public class CommonConfig { logger.info("pipeSubtaskExecutorMaxThreadNum is set to {}.", pipeSubtaskExecutorMaxThreadNum); } + public boolean isPipeRetryLocallyForParallelOrUserConflict() { + return pipeRetryLocallyForParallelOrUserConflict; + } + + public void setPipeRetryLocallyForParallelOrUserConflict( + boolean pipeRetryLocallyForParallelOrUserConflict) { + if (this.pipeRetryLocallyForParallelOrUserConflict + == pipeRetryLocallyForParallelOrUserConflict) { + return; + } + this.pipeRetryLocallyForParallelOrUserConflict = pipeRetryLocallyForParallelOrUserConflict; + logger.info( + "pipeRetryLocallyForParallelOrUserConflict is set to {}.", pipeSubtaskExecutorMaxThreadNum); + } + public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() { return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 4e840d51c55..637628149b6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -143,14 +143,14 @@ public class PipeConfig { return COMMON_CONFIG.getPipeMaxWaitFinishTime(); } - /////////////////////////////// Extractor /////////////////////////////// + /////////////////////////////// Source /////////////////////////////// - public int getPipeExtractorAssignerDisruptorRingBufferSize() { - return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize(); + public int getPipeSourceAssignerDisruptorRingBufferSize() { + return COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferSize(); } - public long getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() { - return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes(); + public long getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes() { + return COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes(); } public long getPipeSourceMatcherCacheSize() { @@ -337,8 +337,8 @@ public class PipeConfig { return COMMON_CONFIG.getPipePeriodicalLogMinIntervalSeconds(); } - public long getPipeLoggerCacheMaxSizeInBytes() { - return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes(); + public boolean isPipeRetryLocallyForParallelOrUserConflict() { + return COMMON_CONFIG.isPipeRetryLocallyForParallelOrUserConflict(); } /////////////////////////////// Logger /////////////////////////////// @@ -359,6 +359,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds(); } + public long getPipeLoggerCacheMaxSizeInBytes() { + return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes(); + } + /////////////////////////////// Memory /////////////////////////////// public boolean getPipeMemoryManagementEnabled() { @@ -482,12 +486,12 @@ public class PipeConfig { LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime()); LOGGER.info( - "PipeExtractorAssignerDisruptorRingBufferSize: {}", - getPipeExtractorAssignerDisruptorRingBufferSize()); + "PipeSourceAssignerDisruptorRingBufferSize: {}", + getPipeSourceAssignerDisruptorRingBufferSize()); LOGGER.info( - "PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}", - getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()); - LOGGER.info("PipeExtractorMatcherCacheSize: {}", getPipeSourceMatcherCacheSize()); + "PipeSourceAssignerDisruptorRingBufferEntrySizeInBytes: {}", + getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes()); + LOGGER.info("PipeSourceMatcherCacheSize: {}", getPipeSourceMatcherCacheSize()); LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", getPipeConnectorHandshakeTimeoutMs()); LOGGER.info("PipeConnectorTransferTimeoutMs: {}", getPipeConnectorTransferTimeoutMs()); @@ -592,12 +596,15 @@ public class PipeConfig { LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled()); LOGGER.info( "PipePeriodicalLogMinIntervalSeconds: {}", getPipePeriodicalLogMinIntervalSeconds()); - LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", getPipeLoggerCacheMaxSizeInBytes()); + LOGGER.info( + "PipeRetryLocallyForParallelOrUserConflict: {}", + isPipeRetryLocallyForParallelOrUserConflict()); LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound()); LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds()); LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", getPipeTsFilePinMaxLogNumPerRound()); LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", getPipeTsFilePinMaxLogIntervalRounds()); + LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", getPipeLoggerCacheMaxSizeInBytes()); LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled()); LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 760a7b5e51f..77aae8a3252 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -290,17 +290,21 @@ public class PipeDescriptor { properties.getProperty( "pipe_subtask_executor_pending_queue_max_blocking_time_ms", String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs())))); + config.setPipeRetryLocallyForParallelOrUserConflict( + Boolean.parseBoolean( + properties.getProperty( + "pipe_retry_locally_for_user_conflict", + String.valueOf(config.isPipeRetryLocallyForParallelOrUserConflict())))); - config.setPipeExtractorAssignerDisruptorRingBufferSize( + config.setPipeSourceAssignerDisruptorRingBufferSize( Integer.parseInt( Optional.ofNullable( properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size")) .orElse( properties.getProperty( "pipe_extractor_assigner_disruptor_ring_buffer_size", - String.valueOf( - config.getPipeExtractorAssignerDisruptorRingBufferSize()))))); - config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB + String.valueOf(config.getPipeSourceAssignerDisruptorRingBufferSize()))))); + config.setPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB Integer.parseInt( Optional.ofNullable( properties.getProperty( @@ -309,8 +313,7 @@ public class PipeDescriptor { properties.getProperty( "pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes", String.valueOf( - config - .getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()))))); + config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes()))))); config.setPipeSourceMatcherCacheSize( Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index 3bb9e14d181..350746d7b8e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingI import org.apache.iotdb.commons.exception.pipe.PipeNonReportException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.pipe.api.event.Event; @@ -131,6 +132,7 @@ public class PipeReceiverStatusHandler { } case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION + case 1815: // PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION if (!isRetryAllowedWhenConflictOccurs) { LOGGER.warn( "User conflict exception: will be ignored because retry is not allowed. event: {}. status: {}", @@ -165,12 +167,16 @@ public class PipeReceiverStatusHandler { + " seconds", status); exceptionEventHasBeenRetried.set(true); - throw new PipeRuntimeSinkRetryTimesConfigurableException( - exceptionMessage, - (int) - Math.max( - PipeSubtask.MAX_RETRY_TIMES, - Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenConflictOccurs * 1.1))); + throw status.getCode() == 1815 + && PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict() + ? new PipeNonReportException(exceptionMessage) + : new PipeRuntimeSinkRetryTimesConfigurableException( + exceptionMessage, + (int) + Math.max( + PipeSubtask.MAX_RETRY_TIMES, + Math.min( + CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenConflictOccurs * 1.1))); } case 803: // NO_PERMISSION
