This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch connection-local-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b4b4e811f38cbf546999f963b0fde5860f4e2b93 Author: Caideyipi <[email protected]> AuthorDate: Mon Feb 9 14:28:17 2026 +0800 Pipe: Enabled locally retry for PipeConnectionException (#17182) * fix * may-fix * fix * fix * try-complete * fix-part * Update IoTDBDataRegionAsyncSink.java * fix * fix --- .../iotdb/tool/tsfile/ImportTsFileRemotely.java | 6 +- .../exchange/sender/TwoStageAggregateSender.java | 3 +- .../protocol/airgap/IoTDBAirGapReceiver.java | 2 +- .../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 2 +- .../pipeconsensus/PipeConsensusSyncSink.java | 2 +- .../PipeConsensusTsFileInsertionEventHandler.java | 2 +- .../thrift/async/IoTDBDataRegionAsyncSink.java | 39 +++++--- .../PipeTransferTabletBatchEventHandler.java | 2 +- .../PipeTransferTabletInsertionEventHandler.java | 2 +- .../async/handler/PipeTransferTsFileHandler.java | 8 +- .../iotdb/commons/client/ClientPoolFactory.java | 20 ++-- .../apache/iotdb/commons/conf/CommonConfig.java | 111 ++++++++++++--------- .../task/subtask/PipeAbstractSinkSubtask.java | 20 ++-- .../agent/task/subtask/PipeReportableSubtask.java | 23 +++-- .../iotdb/commons/pipe/config/PipeConfig.java | 85 ++++++++-------- .../iotdb/commons/pipe/config/PipeDescriptor.java | 32 +++--- .../pipe/sink/client/IoTDBClientManager.java | 2 +- .../commons/pipe/sink/client/IoTDBSyncClient.java | 2 +- .../pipe/sink/client/IoTDBSyncClientManager.java | 2 +- .../pipe/sink/limiter/GlobalRPCRateLimiter.java | 2 +- .../pipe/sink/protocol/IoTDBAirGapSink.java | 4 +- .../commons/pipe/sink/protocol/IoTDBSink.java | 12 +-- .../pipe/sink/protocol/IoTDBSslSyncSink.java | 2 +- 23 files changed, 210 insertions(+), 175 deletions(-) diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java index 855c1678a84..fefff821fa1 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java @@ -162,7 +162,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase { "Handshake error with target server ip: %s, port: %s, because: %s.", client.getIpAddress(), client.getPort(), resp.getStatus())); } else { - client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs()); + client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs()); IOT_PRINTER.println( String.format( "Handshake success. Target server ip: %s, port: %s", @@ -228,7 +228,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase { private void transferFilePieces(final File file, final boolean isMultiFile) throws PipeException, IOException { - final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); + final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); final byte[] readBuffer = new byte[readFileBufferSize]; long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { @@ -297,7 +297,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase { new ThriftClientProperty.Builder() .setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs()) .setRpcThriftCompressionEnabled( - PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled()) + PipeConfig.getInstance().isPipeSinkRPCThriftCompressionEnabled()) .build(), getEndPoint().getIp(), getEndPoint().getPort(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java index 45d6e45d25c..3c36559a300 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java @@ -209,8 +209,7 @@ public class TwoStageAggregateSender implements AutoCloseable { return new IoTDBSyncClient( new ThriftClientProperty.Builder() .setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs()) - .setRpcThriftCompressionEnabled( - PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled()) + .setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled()) .build(), endPoint.getIp(), endPoint.getPort(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 3cea6c998f8..610b9e5fe1a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -70,7 +70,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable { @Override public void runMayThrow() throws Throwable { - socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs()); + socket.setSoTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs()); socket.setKeepAlive(true); LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, socket); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index 406c5cb4fdc..4328c758d39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -385,7 +385,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector { long position = 0; // Try small piece to rebase the file position. - final byte[] buffer = new byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()]; + final byte[] buffer = new byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()]; try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { while (true) { final int dataLength = randomAccessFile.read(buffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java index 7527452c134..b059e484734 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java @@ -373,7 +373,7 @@ public class PipeConsensusSyncSink extends IoTDBSink { final TCommitId tCommitId, final TConsensusGroupId tConsensusGroupId) throws PipeException, IOException { - final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); + final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); final byte[] readBuffer = new byte[readFileBufferSize]; long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java index 4fccd62bffa..22b55239e19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java @@ -100,7 +100,7 @@ public class PipeConsensusTsFileInsertionEventHandler transferMod = event.isWithMod(); currentFile = transferMod ? modFile : tsFile; - readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); + readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); readBuffer = new byte[readFileBufferSize]; position = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index e8eca26293c..aab891c6c7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async; import org.apache.iotdb.common.rpc.thrift.TEndPoint; + +import org.apache.iotdb.commons.audit.UserEntity; +import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -56,6 +59,7 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -121,6 +125,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { new ConcurrentHashMap<>(); private boolean enableSendTsFileLimit; + private volatile boolean isConnectionException; @Override public void validate(final PipeParameterValidator validator) throws Exception { @@ -251,10 +256,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { null, false)); } - } catch (final Throwable t) { - LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, t); + } catch (final Exception e) { + LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, e); if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { - addFailureEventsToRetryQueue(events); + addFailureEventsToRetryQueue(events, e); } } } else { @@ -579,7 +584,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { // Stop retrying if the execution time exceeds the threshold for better realtime performance if (System.currentTimeMillis() - retryStartTime - > PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) { + > PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) { if (retryEventQueueEventCounter.getTabletInsertionEventCount() < PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize() && retryEventQueueEventCounter.getTsFileInsertionEventCount() @@ -590,14 +595,17 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { } if (remainingEvents <= retryEventQueue.size() + retryTsFileQueue.size()) { - throw new PipeException( + final String message = "Failed to retry transferring events in the retry queue. Remaining events: " + (retryEventQueue.size() + retryTsFileQueue.size()) + " (tablet events: " + retryEventQueueEventCounter.getTabletInsertionEventCount() + ", tsfile events: " + retryEventQueueEventCounter.getTsFileInsertionEventCount() - + ")."); + + ")."; + throw isConnectionException + ? new PipeConnectionException(message) + : new PipeException(message); } } } @@ -613,7 +621,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { .decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false); } } catch (final Exception e) { - addFailureEventToRetryQueue(tabletInsertionEvent); + addFailureEventToRetryQueue(tabletInsertionEvent, e); } return; } @@ -626,14 +634,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { .decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false); } } else { - addFailureEventToRetryQueue(tabletInsertionEvent); + addFailureEventToRetryQueue(tabletInsertionEvent, null); } } catch (final Exception e) { if (tabletInsertionEvent instanceof EnrichedEvent) { ((EnrichedEvent) tabletInsertionEvent) .decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false); } - addFailureEventToRetryQueue(tabletInsertionEvent); + addFailureEventToRetryQueue(tabletInsertionEvent, e); } } @@ -643,11 +651,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { tsFileInsertionEvent.decreaseReferenceCount( IoTDBDataRegionAsyncSink.class.getName(), false); } else { - addFailureEventToRetryQueue(tsFileInsertionEvent); + addFailureEventToRetryQueue(tsFileInsertionEvent, null); } } catch (final Exception e) { tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false); - addFailureEventToRetryQueue(tsFileInsertionEvent); + addFailureEventToRetryQueue(tsFileInsertionEvent, e); } } @@ -657,7 +665,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { * @param event {@link Event} to retry */ @SuppressWarnings("java:S899") - public void addFailureEventToRetryQueue(final Event event) { + public void addFailureEventToRetryQueue(final Event event, final Exception e) { + isConnectionException = + e instanceof PipeConnectionException || ThriftClient.isConnectionBroken(e); if (event instanceof EnrichedEvent && ((EnrichedEvent) event).isReleased()) { return; } @@ -693,8 +703,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { * * @param events {@link EnrichedEvent}s to retry */ - public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent> events) { - events.forEach(this::addFailureEventToRetryQueue); + public void addFailureEventsToRetryQueue( + final Iterable<EnrichedEvent> events, final Exception e) { + events.forEach(event -> addFailureEventToRetryQueue(event, e)); } public boolean isEnableSendTsFileLimit() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java index 539764cfc64..110d3cb6450 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java @@ -123,7 +123,7 @@ public class PipeTransferTabletBatchEventHandler extends PipeTransferTrackableHa events.size(), events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet())); } finally { - connector.addFailureEventsToRetryQueue(events); + connector.addFailureEventsToRetryQueue(events, exception); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java index af79c87c253..66a1f4a013b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java @@ -109,7 +109,7 @@ public abstract class PipeTransferTabletInsertionEventHandler extends PipeTransf event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitterKey() : null, event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitIds() : null); } finally { - connector.addFailureEventToRetryQueue(event); + connector.addFailureEventToRetryQueue(event, exception); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index c5ced29b735..1152710ec52 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -123,7 +123,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { readFileBufferSize = (int) Math.min( - PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(), + PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), transferMod ? Math.max(tsFile.length(), modFile.length()) : tsFile.length()); position = 0; @@ -143,7 +143,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { memoryBlock = PipeDataNodeResourceManager.memory() .forceAllocateForTsFileWithRetry( - PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled() + PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled() ? readFileBufferSize : 0); readBuffer = new byte[readFileBufferSize]; @@ -407,7 +407,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { returnClientIfNecessary(); } finally { if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { - connector.addFailureEventsToRetryQueue(events); + connector.addFailureEventsToRetryQueue(events, exception); } } } @@ -469,7 +469,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { * @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock. */ private void waitForResourceEnough4Slicing(final long timeoutMs) throws InterruptedException { - if (!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) { + if (!PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()) { return; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index 18f76a91889..3ff47a2c5e1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -280,15 +280,13 @@ public class ClientPoolFactory { new AsyncPipeDataTransferServiceClient.Factory( manager, new ThriftClientProperty.Builder() - .setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs()) - .setRpcThriftCompressionEnabled( - conf.isPipeConnectorRPCThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager( - conf.getPipeAsyncConnectorSelectorNumber()) + .setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs()) + .setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled()) + .setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber()) .build(), ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()), new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>() - .setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber()) + .setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber()) .build() .getConfig()); ClientManagerMetrics.getInstance() @@ -307,16 +305,14 @@ public class ClientPoolFactory { new AsyncPipeDataTransferServiceClient.Factory( manager, new ThriftClientProperty.Builder() - .setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs()) - .setRpcThriftCompressionEnabled( - conf.isPipeConnectorRPCThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager( - conf.getPipeAsyncConnectorSelectorNumber()) + .setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs()) + .setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled()) + .setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber()) .setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException()) .build(), ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()), new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>() - .setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber()) + .setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber()) .build() .getConfig()); ClientManagerMetrics.getInstance() diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 9e4212c0b2b..09b6beb8c13 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -263,11 +263,12 @@ public class CommonConfig { private long pipeSourceMatcherCacheSize = 1024; private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds - private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes - private int pipeConnectorReadFileBufferSize = 5242880; // 5MB - private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false; - private long pipeConnectorRetryIntervalMs = 1000L; - private boolean pipeConnectorRPCThriftCompressionEnabled = false; + private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes + private int pipeSinkReadFileBufferSize = 5242880; // 5MB + private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false; + private long pipeSinkRetryIntervalMs = 800L; + private boolean pipeSinkRetryLocallyForConnectionError = true; + private boolean pipeSinkRPCThriftCompressionEnabled = false; private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5; private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20; @@ -1012,68 +1013,65 @@ public class CommonConfig { } } - public int getPipeConnectorTransferTimeoutMs() { - return pipeConnectorTransferTimeoutMs; + public int getPipeSinkTransferTimeoutMs() { + return pipeSinkTransferTimeoutMs; } - public void setPipeConnectorTransferTimeoutMs(long pipeConnectorTransferTimeoutMs) { - final int fPipeConnectorTransferTimeoutMs = this.pipeConnectorTransferTimeoutMs; + public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) { + final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs; try { - this.pipeConnectorTransferTimeoutMs = Math.toIntExact(pipeConnectorTransferTimeoutMs); + this.pipeSinkTransferTimeoutMs = Math.toIntExact(pipeSinkTransferTimeoutMs); } catch (ArithmeticException e) { - this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE; + this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE; logger.warn( "Given pipe connector transfer timeout is too large, set to {} ms.", Integer.MAX_VALUE); } finally { - if (fPipeConnectorTransferTimeoutMs != this.pipeConnectorTransferTimeoutMs) { - logger.info("pipeConnectorTransferTimeoutMs is set to {}.", pipeConnectorTransferTimeoutMs); + if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) { + logger.info("pipeConnectorTransferTimeoutMs is set to {}.", pipeSinkTransferTimeoutMs); } } } - public int getPipeConnectorReadFileBufferSize() { - return pipeConnectorReadFileBufferSize; + public int getPipeSinkReadFileBufferSize() { + return pipeSinkReadFileBufferSize; } - public void setPipeConnectorReadFileBufferSize(int pipeConnectorReadFileBufferSize) { - if (this.pipeConnectorReadFileBufferSize == pipeConnectorReadFileBufferSize) { + public void setPipeSinkReadFileBufferSize(int pipeSinkReadFileBufferSize) { + if (this.pipeSinkReadFileBufferSize == pipeSinkReadFileBufferSize) { return; } - this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize; - logger.info("pipeConnectorReadFileBufferSize is set to {}.", pipeConnectorReadFileBufferSize); + this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize; + logger.info("pipeConnectorReadFileBufferSize is set to {}.", pipeSinkReadFileBufferSize); } - public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() { - return isPipeConnectorReadFileBufferMemoryControlEnabled; + public boolean isPipeSinkReadFileBufferMemoryControlEnabled() { + return isPipeSinkReadFileBufferMemoryControlEnabled; } - public void setIsPipeConnectorReadFileBufferMemoryControlEnabled( - boolean isPipeConnectorReadFileBufferMemoryControlEnabled) { - if (this.isPipeConnectorReadFileBufferMemoryControlEnabled - == isPipeConnectorReadFileBufferMemoryControlEnabled) { + public void setIsPipeSinkReadFileBufferMemoryControlEnabled( + boolean isPipeSinkReadFileBufferMemoryControlEnabled) { + if (this.isPipeSinkReadFileBufferMemoryControlEnabled + == isPipeSinkReadFileBufferMemoryControlEnabled) { return; } - this.isPipeConnectorReadFileBufferMemoryControlEnabled = - isPipeConnectorReadFileBufferMemoryControlEnabled; + this.isPipeSinkReadFileBufferMemoryControlEnabled = + isPipeSinkReadFileBufferMemoryControlEnabled; logger.info( - "isPipeConnectorReadFileBufferMemoryControlEnabled is set to {}.", - isPipeConnectorReadFileBufferMemoryControlEnabled); + "isPipeSinkReadFileBufferMemoryControlEnabled is set to {}.", + isPipeSinkReadFileBufferMemoryControlEnabled); } - public void setPipeConnectorRPCThriftCompressionEnabled( - boolean pipeConnectorRPCThriftCompressionEnabled) { - if (this.isPipeConnectorReadFileBufferMemoryControlEnabled - == pipeConnectorRPCThriftCompressionEnabled) { + public void setPipeSinkRPCThriftCompressionEnabled(boolean pipeSinkRPCThriftCompressionEnabled) { + if (this.isPipeSinkReadFileBufferMemoryControlEnabled == pipeSinkRPCThriftCompressionEnabled) { return; } - this.pipeConnectorRPCThriftCompressionEnabled = pipeConnectorRPCThriftCompressionEnabled; + this.pipeSinkRPCThriftCompressionEnabled = pipeSinkRPCThriftCompressionEnabled; logger.info( - "pipeConnectorRPCThriftCompressionEnabled is set to {}.", - pipeConnectorRPCThriftCompressionEnabled); + "pipeSinkRPCThriftCompressionEnabled is set to {}.", pipeSinkRPCThriftCompressionEnabled); } - public boolean isPipeConnectorRPCThriftCompressionEnabled() { - return pipeConnectorRPCThriftCompressionEnabled; + public boolean isPipeSinkRPCThriftCompressionEnabled() { + return pipeSinkRPCThriftCompressionEnabled; } public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize( @@ -1139,11 +1137,11 @@ public class CommonConfig { pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall); } - public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() { + public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() { return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall; } - public int getPipeAsyncConnectorSelectorNumber() { + public int getPipeAsyncSinkSelectorNumber() { return pipeAsyncConnectorSelectorNumber; } @@ -1161,7 +1159,7 @@ public class CommonConfig { logger.info("pipeAsyncConnectorSelectorNumber is set to {}.", pipeAsyncConnectorSelectorNumber); } - public int getPipeAsyncConnectorMaxClientNumber() { + public int getPipeAsyncSinkMaxClientNumber() { return pipeAsyncConnectorMaxClientNumber; } @@ -1180,7 +1178,7 @@ public class CommonConfig { "pipeAsyncConnectorMaxClientNumber is set to {}.", pipeAsyncConnectorMaxClientNumber); } - public int getPipeAsyncConnectorMaxTsFileClientNumber() { + public int getPipeAsyncSinkMaxTsFileClientNumber() { return pipeAsyncConnectorMaxTsFileClientNumber; } @@ -1297,16 +1295,31 @@ public class CommonConfig { logger.info("pipeAutoRestartEnabled is set to {}.", pipeAutoRestartEnabled); } - public long getPipeConnectorRetryIntervalMs() { - return pipeConnectorRetryIntervalMs; + public long getPipeSinkRetryIntervalMs() { + return pipeSinkRetryIntervalMs; } - public void setPipeConnectorRetryIntervalMs(long pipeConnectorRetryIntervalMs) { - if (this.pipeConnectorRetryIntervalMs == pipeConnectorRetryIntervalMs) { + public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) { + if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) { return; } - this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs; - logger.info("pipeConnectorRetryIntervalMs is set to {}", pipeConnectorRetryIntervalMs); + this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs; + logger.info("pipeSinkRetryIntervalMs is set to {}", pipeConnectorRetryIntervalMs); + } + + public boolean isPipeSinkRetryLocallyForConnectionError() { + return pipeSinkRetryLocallyForConnectionError; + } + + public void setPipeSinkRetryLocallyForConnectionError( + boolean pipeSinkRetryLocallyForConnectionError) { + if (this.pipeSinkRetryLocallyForConnectionError == pipeSinkRetryLocallyForConnectionError) { + return; + } + this.pipeSinkRetryLocallyForConnectionError = pipeSinkRetryLocallyForConnectionError; + logger.info( + "pipeSinkRetryLocallyForConnectionError is set to {}", + pipeSinkRetryLocallyForConnectionError); } public int getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() { @@ -2013,7 +2026,7 @@ public class CommonConfig { "rateLimiterHotReloadCheckIntervalMs is set to {}", rateLimiterHotReloadCheckIntervalMs); } - public int getPipeConnectorRequestSliceThresholdBytes() { + public int getPipeSinkRequestSliceThresholdBytes() { return pipeConnectorRequestSliceThresholdBytes; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 2c66adf7d91..fc02d253280 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -134,6 +134,12 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { // return if the pipe task should be stopped return; } + if (PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) { + super.onFailure( + new PipeRuntimeSinkNonReportTimeConfigurableException( + throwable.getMessage(), Long.MAX_VALUE)); + return; + } } // Handle exceptions if any available clients exist @@ -144,9 +150,10 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { super.onFailure(throwable); } else { // Print stack trace for better debugging - LOGGER.warn( - "A non PipeRuntimeConnectorCriticalException occurred, will throw a PipeRuntimeConnectorCriticalException.", - throwable); + PipeLogger.log( + LOGGER::warn, + throwable, + "A non PipeRuntimeSinkCriticalException occurred, will throw a PipeRuntimeSinkCriticalException."); super.onFailure(new PipeRuntimeSinkCriticalException(throwable.getMessage())); } } @@ -179,8 +186,7 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { MAX_RETRY_TIMES, e); try { - sleepIfNoHighPriorityTask( - retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()); + sleepIfNoHighPriorityTask(retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs()); } catch (final InterruptedException interruptedException) { LOGGER.info( "Interrupted while sleeping, will retry to handshake with the target system.", @@ -192,7 +198,9 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { // Stop current pipe task directly if failed to reconnect to // the target system after MAX_RETRY_TIMES times - if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) { + if (retry == MAX_RETRY_TIMES + && lastEvent instanceof EnrichedEvent + && !PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) { report( (EnrichedEvent) lastEvent, new PipeRuntimeSinkCriticalException( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java index 8555ca85f3a..aedb251b53a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java @@ -21,9 +21,11 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,10 +51,11 @@ public abstract class PipeReportableSubtask extends PipeSubtask { return; } - if (lastEvent instanceof EnrichedEvent) { - onEnrichedEventFailure(throwable); + if (lastEvent instanceof EnrichedEvent + && !(throwable instanceof PipeRuntimeSinkNonReportTimeConfigurableException)) { + onReportEventFailure(throwable); } else { - onNonEnrichedEventFailure(throwable); + onNonReportEventFailure(throwable); } // Although the pipe task will be stopped, we still don't release the last event here @@ -61,7 +64,7 @@ public abstract class PipeReportableSubtask extends PipeSubtask { // is dropped or the process is running normally. } - private void onEnrichedEventFailure(final Throwable throwable) { + private void onReportEventFailure(final Throwable throwable) { final int maxRetryTimes = throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException ? ((PipeRuntimeSinkRetryTimesConfigurableException) throwable).getRetryTimes() @@ -80,15 +83,16 @@ public abstract class PipeReportableSubtask extends PipeSubtask { retryCount.incrementAndGet(); if (retryCount.get() <= maxRetryTimes) { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, + throwable, "Retry executing subtask {} (creation time: {}, simple class: {}), retry count [{}/{}], last exception: {}", taskID, creationTime, this.getClass().getSimpleName(), retryCount.get(), maxRetryTimes, - throwable.getMessage(), - throwable); + throwable.getMessage()); try { sleepIfNoHighPriorityTask( retryCount.get() * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()); @@ -136,7 +140,7 @@ public abstract class PipeReportableSubtask extends PipeSubtask { protected abstract void report(final EnrichedEvent event, final PipeRuntimeException exception); - private void onNonEnrichedEventFailure(final Throwable throwable) { + private void onNonReportEventFailure(final Throwable throwable) { if (retryCount.get() == 0) { LOGGER.warn( "Failed to execute subtask {} (creation time: {}, simple class: {}), " @@ -149,7 +153,8 @@ public abstract class PipeReportableSubtask extends PipeSubtask { } retryCount.incrementAndGet(); - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, "Retry executing subtask {} (creation time: {}, simple class: {}), retry count {}, last exception: {}", taskID, creationTime, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index a815800e23f..d51406acb53 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -103,7 +103,7 @@ public class PipeConfig { return COMMON_CONFIG.getPipeMinimumReceiverMemory(); } - /////////////////////////////// Subtask Connector /////////////////////////////// + /////////////////////////////// Subtask Sink /////////////////////////////// public int getPipeRealTimeQueuePollTsFileThreshold() { return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold(); @@ -165,30 +165,34 @@ public class PipeConfig { return COMMON_CONFIG.getPipeSourceMatcherCacheSize(); } - /////////////////////////////// Connector /////////////////////////////// + /////////////////////////////// Sink /////////////////////////////// public int getPipeSinkHandshakeTimeoutMs() { return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs(); } - public int getPipeConnectorTransferTimeoutMs() { - return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs(); + public int getPipeSinkTransferTimeoutMs() { + return COMMON_CONFIG.getPipeSinkTransferTimeoutMs(); } - public int getPipeConnectorReadFileBufferSize() { - return COMMON_CONFIG.getPipeConnectorReadFileBufferSize(); + public int getPipeSinkReadFileBufferSize() { + return COMMON_CONFIG.getPipeSinkReadFileBufferSize(); } - public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() { - return COMMON_CONFIG.isPipeConnectorReadFileBufferMemoryControlEnabled(); + public boolean isPipeSinkReadFileBufferMemoryControlEnabled() { + return COMMON_CONFIG.isPipeSinkReadFileBufferMemoryControlEnabled(); } - public long getPipeConnectorRetryIntervalMs() { - return COMMON_CONFIG.getPipeConnectorRetryIntervalMs(); + public long getPipeSinkRetryIntervalMs() { + return COMMON_CONFIG.getPipeSinkRetryIntervalMs(); } - public boolean isPipeConnectorRPCThriftCompressionEnabled() { - return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled(); + public boolean isPipeSinkRetryLocallyForConnectionError() { + return COMMON_CONFIG.isPipeSinkRetryLocallyForConnectionError(); + } + + public boolean isPipeSinkRPCThriftCompressionEnabled() { + return COMMON_CONFIG.isPipeSinkRPCThriftCompressionEnabled(); } public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() { @@ -203,27 +207,27 @@ public class PipeConfig { return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize(); } - public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() { - return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(); + public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() { + return COMMON_CONFIG.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall(); } - public int getPipeAsyncConnectorSelectorNumber() { - return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber(); + public int getPipeAsyncSinkSelectorNumber() { + return COMMON_CONFIG.getPipeAsyncSinkSelectorNumber(); } - public int getPipeAsyncConnectorMaxClientNumber() { - return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber(); + public int getPipeAsyncSinkMaxClientNumber() { + return COMMON_CONFIG.getPipeAsyncSinkMaxClientNumber(); } - public int getPipeAsyncConnectorMaxTsFileClientNumber() { - return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber(); + public int getPipeAsyncSinkMaxTsFileClientNumber() { + return COMMON_CONFIG.getPipeAsyncSinkMaxTsFileClientNumber(); } public double getPipeSendTsFileRateLimitBytesPerSecond() { return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond(); } - public double getPipeAllConnectorsRateLimitBytesPerSecond() { + public double getPipeAllSinksRateLimitBytesPerSecond() { return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond(); } @@ -231,8 +235,8 @@ public class PipeConfig { return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs(); } - public int getPipeConnectorRequestSliceThresholdBytes() { - return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes(); + public int getPipeSinkRequestSliceThresholdBytes() { + return COMMON_CONFIG.getPipeSinkRequestSliceThresholdBytes(); } public long getPipeMaxReaderChunkSize() { @@ -503,16 +507,14 @@ public class PipeConfig { getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes()); LOGGER.info("PipeSourceMatcherCacheSize: {}", getPipeSourceMatcherCacheSize()); - LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", getPipeSinkHandshakeTimeoutMs()); - LOGGER.info("PipeConnectorTransferTimeoutMs: {}", getPipeConnectorTransferTimeoutMs()); - LOGGER.info("PipeConnectorReadFileBufferSize: {}", getPipeConnectorReadFileBufferSize()); - LOGGER.info( - "PipeConnectorReadFileBufferMemoryControlEnabled: {}", - isPipeConnectorReadFileBufferMemoryControlEnabled()); - LOGGER.info("PipeConnectorRetryIntervalMs: {}", getPipeConnectorRetryIntervalMs()); + LOGGER.info("PipeSinkHandshakeTimeoutMs: {}", getPipeSinkHandshakeTimeoutMs()); + LOGGER.info("PipeSinkTransferTimeoutMs: {}", getPipeSinkTransferTimeoutMs()); + LOGGER.info("PipeSinkReadFileBufferSize: {}", getPipeSinkReadFileBufferSize()); LOGGER.info( - "PipeConnectorRPCThriftCompressionEnabled: {}", - isPipeConnectorRPCThriftCompressionEnabled()); + "PipeSinkReadFileBufferMemoryControlEnabled: {}", + isPipeSinkReadFileBufferMemoryControlEnabled()); + LOGGER.info("PipeSinkRetryIntervalMs: {}", getPipeSinkRetryIntervalMs()); + LOGGER.info("PipeSinkRPCThriftCompressionEnabled: {}", isPipeSinkRPCThriftCompressionEnabled()); LOGGER.info( "PipeLeaderCacheMemoryUsagePercentage: {}", getPipeLeaderCacheMemoryUsagePercentage()); LOGGER.info("PipeReaderChunkSize: {}", getPipeMaxReaderChunkSize()); @@ -559,25 +561,20 @@ public class PipeConfig { "PipeAsyncSinkForcedRetryTotalEventQueueSize: {}", getPipeAsyncSinkForcedRetryTotalEventQueueSize()); LOGGER.info( - "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}", - getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()); - LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", getPipeAsyncConnectorSelectorNumber()); - LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", getPipeAsyncConnectorMaxClientNumber()); - LOGGER.info( - "PipeAsyncConnectorMaxTsFileClientNumber: {}", - getPipeAsyncConnectorMaxTsFileClientNumber()); + "PipeAsyncSinkMaxRetryExecutionTimeMsPerCall: {}", + getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()); + LOGGER.info("PipeAsyncSinkSelectorNumber: {}", getPipeAsyncSinkSelectorNumber()); + LOGGER.info("PipeAsyncSinkMaxClientNumber: {}", getPipeAsyncSinkMaxClientNumber()); + LOGGER.info("PipeAsyncSinkMaxTsFileClientNumber: {}", getPipeAsyncSinkMaxTsFileClientNumber()); LOGGER.info( "PipeSendTsFileRateLimitBytesPerSecond: {}", getPipeSendTsFileRateLimitBytesPerSecond()); LOGGER.info( - "PipeAllConnectorsRateLimitBytesPerSecond: {}", - getPipeAllConnectorsRateLimitBytesPerSecond()); + "PipeAllSinksRateLimitBytesPerSecond: {}", getPipeAllSinksRateLimitBytesPerSecond()); LOGGER.info( "RateLimiterHotReloadCheckIntervalMs: {}", getRateLimiterHotReloadCheckIntervalMs()); - LOGGER.info( - "PipeConnectorRequestSliceThresholdBytes: {}", - getPipeConnectorRequestSliceThresholdBytes()); + LOGGER.info("PipeSinkRequestSliceThresholdBytes: {}", getPipeSinkRequestSliceThresholdBytes()); LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", isSeperatedPipeHeartbeatEnabled()); LOGGER.info( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 5e11df2d086..91dfd11e71f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -341,35 +341,42 @@ public class PipeDescriptor { properties.getProperty( "pipe_connector_handshake_timeout_ms", String.valueOf(config.getPipeSinkHandshakeTimeoutMs()))))); - config.setPipeConnectorReadFileBufferSize( + config.setPipeSinkReadFileBufferSize( Integer.parseInt( Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size")) .orElse( properties.getProperty( "pipe_connector_read_file_buffer_size", - String.valueOf(config.getPipeConnectorReadFileBufferSize()))))); - config.setIsPipeConnectorReadFileBufferMemoryControlEnabled( + String.valueOf(config.getPipeSinkReadFileBufferSize()))))); + config.setIsPipeSinkReadFileBufferMemoryControlEnabled( Boolean.parseBoolean( Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control")) .orElse( properties.getProperty( "pipe_connector_read_file_buffer_memory_control", - String.valueOf( - config.isPipeConnectorReadFileBufferMemoryControlEnabled()))))); - config.setPipeConnectorRetryIntervalMs( + String.valueOf(config.isPipeSinkReadFileBufferMemoryControlEnabled()))))); + config.setPipeSinkRetryIntervalMs( Long.parseLong( Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms")) .orElse( properties.getProperty( "pipe_connector_retry_interval_ms", - String.valueOf(config.getPipeConnectorRetryIntervalMs()))))); - config.setPipeConnectorRPCThriftCompressionEnabled( + String.valueOf(config.getPipeSinkRetryIntervalMs()))))); + config.setPipeSinkRetryLocallyForConnectionError( + Boolean.parseBoolean( + Optional.ofNullable( + properties.getProperty("pipe_sink_retry_locally_for_connection_error")) + .orElse( + properties.getProperty( + "pipe_connector_retry_locally_for_connection_error", + String.valueOf(config.isPipeSinkRetryLocallyForConnectionError()))))); + config.setPipeSinkRPCThriftCompressionEnabled( Boolean.parseBoolean( Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled")) .orElse( properties.getProperty( "pipe_connector_rpc_thrift_compression_enabled", - String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled()))))); + String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled()))))); config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall( Long.parseLong( Optional.ofNullable( @@ -377,8 +384,7 @@ public class PipeDescriptor { .orElse( properties.getProperty( "pipe_async_connector_max_retry_execution_time_ms_per_call", - String.valueOf( - config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()))))); + String.valueOf(config.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()))))); config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize( Integer.parseInt( Optional.ofNullable( @@ -415,7 +421,7 @@ public class PipeDescriptor { Integer.parseInt( properties.getProperty( "pipe_connector_request_slice_threshold_bytes", - String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes())))); + String.valueOf(config.getPipeSinkRequestSliceThresholdBytes())))); config.setPipeReceiverLoginPeriodicVerificationIntervalMs( Long.parseLong( @@ -566,7 +572,7 @@ public class PipeDescriptor { parserPipeConfig( properties, "pipe_sink_timeout_ms", "pipe_connector_timeout_ms", isHotModify); if (value != null) { - config.setPipeConnectorTransferTimeoutMs(Long.parseLong(value)); + config.setPipeSinkTransferTimeoutMs(Long.parseLong(value)); } value = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java index 118f2f26876..917d4a74ed9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java @@ -56,7 +56,7 @@ public abstract class IoTDBClientManager { private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; // 6 hours protected static final AtomicInteger CONNECTION_TIMEOUT_MS = - new AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs()); + new AtomicInteger(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs()); protected IoTDBClientManager( List<TEndPoint> endPointList, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java index a0bb65b5a7e..b7f42295e6c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java @@ -100,7 +100,7 @@ public class IoTDBSyncClient extends IClientRPCService.Client @Override public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TException { - final int bodySizeLimit = PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes(); + final int bodySizeLimit = PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes(); if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion() || req.body.limit() < bodySizeLimit) { return super.pipeTransfer(req); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java index fa5f0d3383c..969f42bfcec 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java @@ -195,7 +195,7 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen new ThriftClientProperty.Builder() .setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs()) .setRpcThriftCompressionEnabled( - PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled()) + PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled()) .build(), endPoint.getIp(), endPoint.getPort(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java index 9a6aba5b90b..e497656eab2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java @@ -28,6 +28,6 @@ public class GlobalRPCRateLimiter extends GlobalRateLimiter { @Override protected double getThroughputBytesPerSecond() { - return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond(); + return CONFIG.getPipeAllSinksRateLimitBytesPerSecond(); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java index dda8818e04f..763bfa644ea 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java @@ -238,7 +238,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink { } else { supportModsIfIsDataNodeReceiver = true; } - socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs()); + socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs()); LOGGER.info("Handshake success. Socket: {}", socket); } @@ -265,7 +265,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink { final AirGapSocket socket, final boolean isMultiFile) throws PipeException, IOException { - final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); + final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); final byte[] readBuffer = new byte[readFileBufferSize]; long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 0668f7cc99b..66f3eda6a1a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -378,7 +378,7 @@ public abstract class IoTDBSink implements PipeConnector { nodeUrls.clear(); nodeUrls.addAll(parseNodeUrls(parameters)); - LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls); + LOGGER.info("IoTDBSink nodeUrls: {}", nodeUrls); isTabletBatchModeEnabled = parameters.getBooleanOrDefault( @@ -390,7 +390,7 @@ public abstract class IoTDBSink implements PipeConnector { Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY), CONNECTOR_FORMAT_HYBRID_VALUE) .equals(CONNECTOR_FORMAT_TS_FILE_VALUE); - LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled); + LOGGER.info("IoTDBSink isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled); final boolean shouldMarkAsGeneralWriteRequest = parameters.getBooleanOrDefault( @@ -406,7 +406,7 @@ public abstract class IoTDBSink implements PipeConnector { Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY), CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE); } - LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", shouldMarkAsPipeRequest); + LOGGER.info("IoTDBSink shouldMarkAsPipeRequest: {}", shouldMarkAsPipeRequest); final String connectorSkipIfValue = parameters @@ -425,7 +425,7 @@ public abstract class IoTDBSink implements PipeConnector { throw new PipeParameterNotValidException( String.format("Parameters in set %s are not allowed in 'skipif'", skipIfOptionSet)); } - LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges); + LOGGER.info("IoTDBSink skipIfNoPrivileges: {}", skipIfNoPrivileges); receiverStatusHandler = new PipeReceiverStatusHandler( @@ -465,7 +465,7 @@ public abstract class IoTDBSink implements PipeConnector { SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY), CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE); LOGGER.info( - "IoTDBConnector {} = {}", + "IoTDBSink {} = {}", CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY, shouldReceiverConvertOnTypeMismatch); isRealtimeFirst = @@ -475,7 +475,7 @@ public abstract class IoTDBSink implements PipeConnector { PipeSinkConstant.SINK_REALTIME_FIRST_KEY), PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE); LOGGER.info( - "IoTDBConnector {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst); + "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst); } protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters parameters) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java index 080efd25846..96b7346d025 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java @@ -175,7 +175,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink { final Pair<IoTDBSyncClient, Boolean> clientAndStatus, final boolean isMultiFile) throws PipeException, IOException { - final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); + final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); final byte[] readBuffer = new byte[readFileBufferSize]; long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
