This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2ff116b55ca [To dev/1.3] Pipe: Enabled locally retry for
PipeConnectionException (#17182) (#17190)
2ff116b55ca is described below
commit 2ff116b55ca9425183eca34652e5cac5eedcf9fb
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 10 09:30:35 2026 +0800
[To dev/1.3] Pipe: Enabled locally retry for PipeConnectionException
(#17182) (#17190)
* Pipe: Enabled locally retry for PipeConnectionException (#17182)
* fix
* may-fix
* fix
* fix
* try-complete
* fix-part
* Update IoTDBDataRegionAsyncSink.java
* fix
* fix
* fix
* sit
---
.../iotdb/tool/tsfile/ImportTsFileRemotely.java | 6 +-
.../exchange/sender/TwoStageAggregateSender.java | 3 +-
.../protocol/airgap/IoTDBAirGapReceiver.java | 2 +-
.../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 2 +-
.../pipeconsensus/PipeConsensusSyncSink.java | 2 +-
.../PipeConsensusTsFileInsertionEventHandler.java | 2 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 37 ++++---
.../PipeTransferTabletBatchEventHandler.java | 2 +-
.../PipeTransferTabletInsertionEventHandler.java | 2 +-
.../async/handler/PipeTransferTsFileHandler.java | 8 +-
.../iotdb/commons/client/ClientPoolFactory.java | 20 ++--
.../apache/iotdb/commons/conf/CommonConfig.java | 111 ++++++++++++---------
.../task/subtask/PipeAbstractSinkSubtask.java | 20 ++--
.../agent/task/subtask/PipeReportableSubtask.java | 27 +++--
.../iotdb/commons/pipe/config/PipeConfig.java | 85 ++++++++--------
.../iotdb/commons/pipe/config/PipeDescriptor.java | 32 +++---
.../pipe/sink/client/IoTDBClientManager.java | 2 +-
.../commons/pipe/sink/client/IoTDBSyncClient.java | 2 +-
.../pipe/sink/client/IoTDBSyncClientManager.java | 2 +-
.../pipe/sink/limiter/GlobalRPCRateLimiter.java | 2 +-
.../pipe/sink/protocol/IoTDBAirGapSink.java | 4 +-
.../commons/pipe/sink/protocol/IoTDBSink.java | 12 +--
.../pipe/sink/protocol/IoTDBSslSyncSink.java | 2 +-
23 files changed, 210 insertions(+), 177 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index 855c1678a84..fefff821fa1 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -162,7 +162,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
"Handshake error with target server ip: %s, port: %s, because:
%s.",
client.getIpAddress(), client.getPort(), resp.getStatus()));
} else {
-
client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
IOT_PRINTER.println(
String.format(
"Handshake success. Target server ip: %s, port: %s",
@@ -228,7 +228,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
private void transferFilePieces(final File file, final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
@@ -297,7 +297,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
-
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
+
PipeConfig.getInstance().isPipeSinkRPCThriftCompressionEnabled())
.build(),
getEndPoint().getIp(),
getEndPoint().getPort(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index 45d6e45d25c..3c36559a300 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -209,8 +209,7 @@ public class TwoStageAggregateSender implements
AutoCloseable {
return new IoTDBSyncClient(
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
- .setRpcThriftCompressionEnabled(
- PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+
.setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 3cea6c998f8..610b9e5fe1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -70,7 +70,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
@Override
public void runMayThrow() throws Throwable {
-
socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
socket.setSoTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
socket.setKeepAlive(true);
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId,
socket);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 406c5cb4fdc..4328c758d39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -385,7 +385,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
long position = 0;
// Try small piece to rebase the file position.
- final byte[] buffer = new
byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()];
+ final byte[] buffer = new
byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()];
try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"r")) {
while (true) {
final int dataLength = randomAccessFile.read(buffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
index 7527452c134..b059e484734 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
@@ -373,7 +373,7 @@ public class PipeConsensusSyncSink extends IoTDBSink {
final TCommitId tCommitId,
final TConsensusGroupId tConsensusGroupId)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 4fccd62bffa..22b55239e19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -100,7 +100,7 @@ public class PipeConsensusTsFileInsertionEventHandler
transferMod = event.isWithMod();
currentFile = transferMod ? modFile : tsFile;
- readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
readBuffer = new byte[readFileBufferSize];
position = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index e8eca26293c..f8e0684188a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -56,6 +57,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -121,6 +123,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
new ConcurrentHashMap<>();
private boolean enableSendTsFileLimit;
+ private volatile boolean isConnectionException;
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -251,10 +254,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
null,
false));
}
- } catch (final Throwable t) {
- LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, t);
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, e);
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- addFailureEventsToRetryQueue(events);
+ addFailureEventsToRetryQueue(events, e);
}
}
} else {
@@ -579,7 +582,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
// Stop retrying if the execution time exceeds the threshold for better
realtime performance
if (System.currentTimeMillis() - retryStartTime
- >
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())
{
+ >
PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) {
if (retryEventQueueEventCounter.getTabletInsertionEventCount()
<
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
@@ -590,14 +593,17 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
}
if (remainingEvents <= retryEventQueue.size() +
retryTsFileQueue.size()) {
- throw new PipeException(
+ final String message =
"Failed to retry transferring events in the retry queue.
Remaining events: "
+ (retryEventQueue.size() + retryTsFileQueue.size())
+ " (tablet events: "
+ retryEventQueueEventCounter.getTabletInsertionEventCount()
+ ", tsfile events: "
+ retryEventQueueEventCounter.getTsFileInsertionEventCount()
- + ").");
+ + ").";
+ throw isConnectionException
+ ? new PipeConnectionException(message)
+ : new PipeException(message);
}
}
}
@@ -613,7 +619,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
}
} catch (final Exception e) {
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, e);
}
return;
}
@@ -626,14 +632,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
}
} else {
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, null);
}
} catch (final Exception e) {
if (tabletInsertionEvent instanceof EnrichedEvent) {
((EnrichedEvent) tabletInsertionEvent)
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
false);
}
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, e);
}
}
@@ -643,11 +649,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
tsFileInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionAsyncSink.class.getName(), false);
} else {
- addFailureEventToRetryQueue(tsFileInsertionEvent);
+ addFailureEventToRetryQueue(tsFileInsertionEvent, null);
}
} catch (final Exception e) {
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
false);
- addFailureEventToRetryQueue(tsFileInsertionEvent);
+ addFailureEventToRetryQueue(tsFileInsertionEvent, e);
}
}
@@ -657,7 +663,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
* @param event {@link Event} to retry
*/
@SuppressWarnings("java:S899")
- public void addFailureEventToRetryQueue(final Event event) {
+ public void addFailureEventToRetryQueue(final Event event, final Exception
e) {
+ isConnectionException =
+ e instanceof PipeConnectionException ||
ThriftClient.isConnectionBroken(e);
if (event instanceof EnrichedEvent && ((EnrichedEvent)
event).isReleased()) {
return;
}
@@ -693,8 +701,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
*
* @param events {@link EnrichedEvent}s to retry
*/
- public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent>
events) {
- events.forEach(this::addFailureEventToRetryQueue);
+ public void addFailureEventsToRetryQueue(
+ final Iterable<EnrichedEvent> events, final Exception e) {
+ events.forEach(event -> addFailureEventToRetryQueue(event, e));
}
public boolean isEnableSendTsFileLimit() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 539764cfc64..110d3cb6450 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -123,7 +123,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
events.size(),
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
} finally {
- connector.addFailureEventsToRetryQueue(events);
+ connector.addFailureEventsToRetryQueue(events, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index af79c87c253..66a1f4a013b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -109,7 +109,7 @@ public abstract class
PipeTransferTabletInsertionEventHandler extends PipeTransf
event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitIds() : null);
} finally {
- connector.addFailureEventToRetryQueue(event);
+ connector.addFailureEventToRetryQueue(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index c5ced29b735..1152710ec52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -123,7 +123,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
readFileBufferSize =
(int)
Math.min(
- PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(),
+ PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
transferMod ? Math.max(tsFile.length(), modFile.length()) :
tsFile.length());
position = 0;
@@ -143,7 +143,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
memoryBlock =
PipeDataNodeResourceManager.memory()
.forceAllocateForTsFileWithRetry(
-
PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()
+
PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()
? readFileBufferSize
: 0);
readBuffer = new byte[readFileBufferSize];
@@ -407,7 +407,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
returnClientIfNecessary();
} finally {
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- connector.addFailureEventsToRetryQueue(events);
+ connector.addFailureEventsToRetryQueue(events, exception);
}
}
}
@@ -469,7 +469,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
* @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
*/
private void waitForResourceEnough4Slicing(final long timeoutMs) throws
InterruptedException {
- if
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled())
{
+ if
(!PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()) {
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 18f76a91889..3ff47a2c5e1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -280,15 +280,13 @@ public class ClientPoolFactory {
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
- .setRpcThriftCompressionEnabled(
- conf.isPipeConnectorRPCThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(
- conf.getPipeAsyncConnectorSelectorNumber())
+
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.build(),
ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
+
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
@@ -307,16 +305,14 @@ public class ClientPoolFactory {
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
- .setRpcThriftCompressionEnabled(
- conf.isPipeConnectorRPCThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(
- conf.getPipeAsyncConnectorSelectorNumber())
+
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
.build(),
ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a60c870959f..62c389df758 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -265,11 +265,12 @@ public class CommonConfig {
private long pipeSourceMatcherCacheSize = 1024;
private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
- private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
- private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
- private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
- private long pipeConnectorRetryIntervalMs = 1000L;
- private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+ private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
+ private int pipeSinkReadFileBufferSize = 5242880; // 5MB
+ private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
+ private long pipeSinkRetryIntervalMs = 800L;
+ private boolean pipeSinkRetryLocallyForConnectionError = true;
+ private boolean pipeSinkRPCThriftCompressionEnabled = false;
private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
@@ -1014,68 +1015,65 @@ public class CommonConfig {
}
}
- public int getPipeConnectorTransferTimeoutMs() {
- return pipeConnectorTransferTimeoutMs;
+ public int getPipeSinkTransferTimeoutMs() {
+ return pipeSinkTransferTimeoutMs;
}
- public void setPipeConnectorTransferTimeoutMs(long
pipeConnectorTransferTimeoutMs) {
- final int fPipeConnectorTransferTimeoutMs =
this.pipeConnectorTransferTimeoutMs;
+ public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
+ final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
try {
- this.pipeConnectorTransferTimeoutMs =
Math.toIntExact(pipeConnectorTransferTimeoutMs);
+ this.pipeSinkTransferTimeoutMs =
Math.toIntExact(pipeSinkTransferTimeoutMs);
} catch (ArithmeticException e) {
- this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+ this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
logger.warn(
"Given pipe connector transfer timeout is too large, set to {} ms.",
Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorTransferTimeoutMs !=
this.pipeConnectorTransferTimeoutMs) {
- logger.info("pipeConnectorTransferTimeoutMs is set to {}.",
pipeConnectorTransferTimeoutMs);
+ if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+ logger.info("pipeConnectorTransferTimeoutMs is set to {}.",
pipeSinkTransferTimeoutMs);
}
}
}
- public int getPipeConnectorReadFileBufferSize() {
- return pipeConnectorReadFileBufferSize;
+ public int getPipeSinkReadFileBufferSize() {
+ return pipeSinkReadFileBufferSize;
}
- public void setPipeConnectorReadFileBufferSize(int
pipeConnectorReadFileBufferSize) {
- if (this.pipeConnectorReadFileBufferSize ==
pipeConnectorReadFileBufferSize) {
+ public void setPipeSinkReadFileBufferSize(int pipeSinkReadFileBufferSize) {
+ if (this.pipeSinkReadFileBufferSize == pipeSinkReadFileBufferSize) {
return;
}
- this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
- logger.info("pipeConnectorReadFileBufferSize is set to {}.",
pipeConnectorReadFileBufferSize);
+ this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
+ logger.info("pipeConnectorReadFileBufferSize is set to {}.",
pipeSinkReadFileBufferSize);
}
- public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
- return isPipeConnectorReadFileBufferMemoryControlEnabled;
+ public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+ return isPipeSinkReadFileBufferMemoryControlEnabled;
}
- public void setIsPipeConnectorReadFileBufferMemoryControlEnabled(
- boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
- if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
- == isPipeConnectorReadFileBufferMemoryControlEnabled) {
+ public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
+ boolean isPipeSinkReadFileBufferMemoryControlEnabled) {
+ if (this.isPipeSinkReadFileBufferMemoryControlEnabled
+ == isPipeSinkReadFileBufferMemoryControlEnabled) {
return;
}
- this.isPipeConnectorReadFileBufferMemoryControlEnabled =
- isPipeConnectorReadFileBufferMemoryControlEnabled;
+ this.isPipeSinkReadFileBufferMemoryControlEnabled =
+ isPipeSinkReadFileBufferMemoryControlEnabled;
logger.info(
- "isPipeConnectorReadFileBufferMemoryControlEnabled is set to {}.",
- isPipeConnectorReadFileBufferMemoryControlEnabled);
+ "isPipeSinkReadFileBufferMemoryControlEnabled is set to {}.",
+ isPipeSinkReadFileBufferMemoryControlEnabled);
}
- public void setPipeConnectorRPCThriftCompressionEnabled(
- boolean pipeConnectorRPCThriftCompressionEnabled) {
- if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
- == pipeConnectorRPCThriftCompressionEnabled) {
+ public void setPipeSinkRPCThriftCompressionEnabled(boolean
pipeSinkRPCThriftCompressionEnabled) {
+ if (this.isPipeSinkReadFileBufferMemoryControlEnabled ==
pipeSinkRPCThriftCompressionEnabled) {
return;
}
- this.pipeConnectorRPCThriftCompressionEnabled =
pipeConnectorRPCThriftCompressionEnabled;
+ this.pipeSinkRPCThriftCompressionEnabled =
pipeSinkRPCThriftCompressionEnabled;
logger.info(
- "pipeConnectorRPCThriftCompressionEnabled is set to {}.",
- pipeConnectorRPCThriftCompressionEnabled);
+ "pipeSinkRPCThriftCompressionEnabled is set to {}.",
pipeSinkRPCThriftCompressionEnabled);
}
- public boolean isPipeConnectorRPCThriftCompressionEnabled() {
- return pipeConnectorRPCThriftCompressionEnabled;
+ public boolean isPipeSinkRPCThriftCompressionEnabled() {
+ return pipeSinkRPCThriftCompressionEnabled;
}
public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
@@ -1141,11 +1139,11 @@ public class CommonConfig {
pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
}
- public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+ public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
}
- public int getPipeAsyncConnectorSelectorNumber() {
+ public int getPipeAsyncSinkSelectorNumber() {
return pipeAsyncConnectorSelectorNumber;
}
@@ -1163,7 +1161,7 @@ public class CommonConfig {
logger.info("pipeAsyncConnectorSelectorNumber is set to {}.",
pipeAsyncConnectorSelectorNumber);
}
- public int getPipeAsyncConnectorMaxClientNumber() {
+ public int getPipeAsyncSinkMaxClientNumber() {
return pipeAsyncConnectorMaxClientNumber;
}
@@ -1182,7 +1180,7 @@ public class CommonConfig {
"pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxClientNumber);
}
- public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+ public int getPipeAsyncSinkMaxTsFileClientNumber() {
return pipeAsyncConnectorMaxTsFileClientNumber;
}
@@ -1299,16 +1297,31 @@ public class CommonConfig {
logger.info("pipeAutoRestartEnabled is set to {}.",
pipeAutoRestartEnabled);
}
- public long getPipeConnectorRetryIntervalMs() {
- return pipeConnectorRetryIntervalMs;
+ public long getPipeSinkRetryIntervalMs() {
+ return pipeSinkRetryIntervalMs;
}
- public void setPipeConnectorRetryIntervalMs(long
pipeConnectorRetryIntervalMs) {
- if (this.pipeConnectorRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+ public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
+ if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
return;
}
- this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
- logger.info("pipeConnectorRetryIntervalMs is set to {}",
pipeConnectorRetryIntervalMs);
+ this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
+ logger.info("pipeSinkRetryIntervalMs is set to {}",
pipeConnectorRetryIntervalMs);
+ }
+
+ public boolean isPipeSinkRetryLocallyForConnectionError() {
+ return pipeSinkRetryLocallyForConnectionError;
+ }
+
+ public void setPipeSinkRetryLocallyForConnectionError(
+ boolean pipeSinkRetryLocallyForConnectionError) {
+ if (this.pipeSinkRetryLocallyForConnectionError ==
pipeSinkRetryLocallyForConnectionError) {
+ return;
+ }
+ this.pipeSinkRetryLocallyForConnectionError =
pipeSinkRetryLocallyForConnectionError;
+ logger.info(
+ "pipeSinkRetryLocallyForConnectionError is set to {}",
+ pipeSinkRetryLocallyForConnectionError);
}
public int
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
@@ -2043,7 +2056,7 @@ public class CommonConfig {
"rateLimiterHotReloadCheckIntervalMs is set to {}",
rateLimiterHotReloadCheckIntervalMs);
}
- public int getPipeConnectorRequestSliceThresholdBytes() {
+ public int getPipeSinkRequestSliceThresholdBytes() {
return pipeConnectorRequestSliceThresholdBytes;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 2c66adf7d91..fc02d253280 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -134,6 +134,12 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
// return if the pipe task should be stopped
return;
}
+ if
(PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
+ super.onFailure(
+ new PipeRuntimeSinkNonReportTimeConfigurableException(
+ throwable.getMessage(), Long.MAX_VALUE));
+ return;
+ }
}
// Handle exceptions if any available clients exist
@@ -144,9 +150,10 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
super.onFailure(throwable);
} else {
// Print stack trace for better debugging
- LOGGER.warn(
- "A non PipeRuntimeConnectorCriticalException occurred, will throw
a PipeRuntimeConnectorCriticalException.",
- throwable);
+ PipeLogger.log(
+ LOGGER::warn,
+ throwable,
+ "A non PipeRuntimeSinkCriticalException occurred, will throw a
PipeRuntimeSinkCriticalException.");
super.onFailure(new
PipeRuntimeSinkCriticalException(throwable.getMessage()));
}
}
@@ -179,8 +186,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
MAX_RETRY_TIMES,
e);
try {
- sleepIfNoHighPriorityTask(
- retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+ sleepIfNoHighPriorityTask(retry *
PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
} catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to handshake with the
target system.",
@@ -192,7 +198,9 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
// Stop current pipe task directly if failed to reconnect to
// the target system after MAX_RETRY_TIMES times
- if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
+ if (retry == MAX_RETRY_TIMES
+ && lastEvent instanceof EnrichedEvent
+ &&
!PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
report(
(EnrichedEvent) lastEvent,
new PipeRuntimeSinkCriticalException(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 8555ca85f3a..251acfb7b12 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,10 +51,11 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
return;
}
- if (lastEvent instanceof EnrichedEvent) {
- onEnrichedEventFailure(throwable);
+ if (lastEvent instanceof EnrichedEvent
+ && !(throwable instanceof
PipeRuntimeSinkNonReportTimeConfigurableException)) {
+ onReportEventFailure(throwable);
} else {
- onNonEnrichedEventFailure(throwable);
+ onNonReportEventFailure(throwable);
}
// Although the pipe task will be stopped, we still don't release the last
event here
@@ -61,7 +64,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
// is dropped or the process is running normally.
}
- private void onEnrichedEventFailure(final Throwable throwable) {
+ private void onReportEventFailure(final Throwable throwable) {
final int maxRetryTimes =
throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException
? ((PipeRuntimeSinkRetryTimesConfigurableException)
throwable).getRetryTimes()
@@ -80,18 +83,19 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
retryCount.incrementAndGet();
if (retryCount.get() <= maxRetryTimes) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
+ throwable,
"Retry executing subtask {} (creation time: {}, simple class: {}),
retry count [{}/{}], last exception: {}",
taskID,
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
maxRetryTimes,
- throwable.getMessage(),
- throwable);
+ throwable.getMessage());
try {
sleepIfNoHighPriorityTask(
- retryCount.get() *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+ retryCount.get() *
PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time:
{}, simple class: {})",
@@ -136,7 +140,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
protected abstract void report(final EnrichedEvent event, final
PipeRuntimeException exception);
- private void onNonEnrichedEventFailure(final Throwable throwable) {
+ private void onNonReportEventFailure(final Throwable throwable) {
if (retryCount.get() == 0) {
LOGGER.warn(
"Failed to execute subtask {} (creation time: {}, simple class: {}),
"
@@ -149,7 +153,8 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
}
retryCount.incrementAndGet();
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Retry executing subtask {} (creation time: {}, simple class: {}),
retry count {}, last exception: {}",
taskID,
creationTime,
@@ -159,7 +164,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
throwable);
try {
sleepIfNoHighPriorityTask(
- retryCount.get() *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+ retryCount.get() *
PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time: {},
simple class: {})",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 733d7b258dd..fbf4cf191ac 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -103,7 +103,7 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMinimumReceiverMemory();
}
- /////////////////////////////// Subtask Connector
///////////////////////////////
+ /////////////////////////////// Subtask Sink ///////////////////////////////
public int getPipeRealTimeQueuePollTsFileThreshold() {
return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
@@ -173,30 +173,34 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
}
- /////////////////////////////// Connector ///////////////////////////////
+ /////////////////////////////// Sink ///////////////////////////////
public int getPipeSinkHandshakeTimeoutMs() {
return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
}
- public int getPipeConnectorTransferTimeoutMs() {
- return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
+ public int getPipeSinkTransferTimeoutMs() {
+ return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
}
- public int getPipeConnectorReadFileBufferSize() {
- return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
+ public int getPipeSinkReadFileBufferSize() {
+ return COMMON_CONFIG.getPipeSinkReadFileBufferSize();
}
- public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
- return COMMON_CONFIG.isPipeConnectorReadFileBufferMemoryControlEnabled();
+ public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+ return COMMON_CONFIG.isPipeSinkReadFileBufferMemoryControlEnabled();
}
- public long getPipeConnectorRetryIntervalMs() {
- return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
+ public long getPipeSinkRetryIntervalMs() {
+ return COMMON_CONFIG.getPipeSinkRetryIntervalMs();
}
- public boolean isPipeConnectorRPCThriftCompressionEnabled() {
- return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
+ public boolean isPipeSinkRetryLocallyForConnectionError() {
+ return COMMON_CONFIG.isPipeSinkRetryLocallyForConnectionError();
+ }
+
+ public boolean isPipeSinkRPCThriftCompressionEnabled() {
+ return COMMON_CONFIG.isPipeSinkRPCThriftCompressionEnabled();
}
public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
@@ -211,27 +215,27 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
}
- public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+ public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall();
}
- public int getPipeAsyncConnectorSelectorNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
+ public int getPipeAsyncSinkSelectorNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkSelectorNumber();
}
- public int getPipeAsyncConnectorMaxClientNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
+ public int getPipeAsyncSinkMaxClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxClientNumber();
}
- public int getPipeAsyncConnectorMaxTsFileClientNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+ public int getPipeAsyncSinkMaxTsFileClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxTsFileClientNumber();
}
public double getPipeSendTsFileRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
}
- public double getPipeAllConnectorsRateLimitBytesPerSecond() {
+ public double getPipeAllSinksRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
@@ -239,8 +243,8 @@ public class PipeConfig {
return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
}
- public int getPipeConnectorRequestSliceThresholdBytes() {
- return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
+ public int getPipeSinkRequestSliceThresholdBytes() {
+ return COMMON_CONFIG.getPipeSinkRequestSliceThresholdBytes();
}
public long getPipeMaxReaderChunkSize() {
@@ -517,16 +521,14 @@ public class PipeConfig {
getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
LOGGER.info("PipeSourceMatcherCacheSize: {}",
getPipeSourceMatcherCacheSize());
- LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeSinkHandshakeTimeoutMs());
- LOGGER.info("PipeConnectorTransferTimeoutMs: {}",
getPipeConnectorTransferTimeoutMs());
- LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
- LOGGER.info(
- "PipeConnectorReadFileBufferMemoryControlEnabled: {}",
- isPipeConnectorReadFileBufferMemoryControlEnabled());
- LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
+ LOGGER.info("PipeSinkHandshakeTimeoutMs: {}",
getPipeSinkHandshakeTimeoutMs());
+ LOGGER.info("PipeSinkTransferTimeoutMs: {}",
getPipeSinkTransferTimeoutMs());
+ LOGGER.info("PipeSinkReadFileBufferSize: {}",
getPipeSinkReadFileBufferSize());
LOGGER.info(
- "PipeConnectorRPCThriftCompressionEnabled: {}",
- isPipeConnectorRPCThriftCompressionEnabled());
+ "PipeSinkReadFileBufferMemoryControlEnabled: {}",
+ isPipeSinkReadFileBufferMemoryControlEnabled());
+ LOGGER.info("PipeSinkRetryIntervalMs: {}", getPipeSinkRetryIntervalMs());
+ LOGGER.info("PipeSinkRPCThriftCompressionEnabled: {}",
isPipeSinkRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
LOGGER.info("PipeReaderChunkSize: {}", getPipeMaxReaderChunkSize());
@@ -573,25 +575,20 @@ public class PipeConfig {
"PipeAsyncSinkForcedRetryTotalEventQueueSize: {}",
getPipeAsyncSinkForcedRetryTotalEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
- getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
- LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
- LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
- LOGGER.info(
- "PipeAsyncConnectorMaxTsFileClientNumber: {}",
- getPipeAsyncConnectorMaxTsFileClientNumber());
+ "PipeAsyncSinkMaxRetryExecutionTimeMsPerCall: {}",
+ getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall());
+ LOGGER.info("PipeAsyncSinkSelectorNumber: {}",
getPipeAsyncSinkSelectorNumber());
+ LOGGER.info("PipeAsyncSinkMaxClientNumber: {}",
getPipeAsyncSinkMaxClientNumber());
+ LOGGER.info("PipeAsyncSinkMaxTsFileClientNumber: {}",
getPipeAsyncSinkMaxTsFileClientNumber());
LOGGER.info(
"PipeSendTsFileRateLimitBytesPerSecond: {}",
getPipeSendTsFileRateLimitBytesPerSecond());
LOGGER.info(
- "PipeAllConnectorsRateLimitBytesPerSecond: {}",
- getPipeAllConnectorsRateLimitBytesPerSecond());
+ "PipeAllSinksRateLimitBytesPerSecond: {}",
getPipeAllSinksRateLimitBytesPerSecond());
LOGGER.info(
"RateLimiterHotReloadCheckIntervalMs: {}",
getRateLimiterHotReloadCheckIntervalMs());
- LOGGER.info(
- "PipeConnectorRequestSliceThresholdBytes: {}",
- getPipeConnectorRequestSliceThresholdBytes());
+ LOGGER.info("PipeSinkRequestSliceThresholdBytes: {}",
getPipeSinkRequestSliceThresholdBytes());
LOGGER.info("SeperatedPipeHeartbeatEnabled: {}",
isSeperatedPipeHeartbeatEnabled());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index ed7723b4972..5322bfcd7ba 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -351,35 +351,42 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
- config.setPipeConnectorReadFileBufferSize(
+ config.setPipeSinkReadFileBufferSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_size",
-
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
- config.setIsPipeConnectorReadFileBufferMemoryControlEnabled(
+
String.valueOf(config.getPipeSinkReadFileBufferSize())))));
+ config.setIsPipeSinkReadFileBufferMemoryControlEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_memory_control",
- String.valueOf(
-
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
- config.setPipeConnectorRetryIntervalMs(
+
String.valueOf(config.isPipeSinkReadFileBufferMemoryControlEnabled())))));
+ config.setPipeSinkRetryIntervalMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
.orElse(
properties.getProperty(
"pipe_connector_retry_interval_ms",
-
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
- config.setPipeConnectorRPCThriftCompressionEnabled(
+
String.valueOf(config.getPipeSinkRetryIntervalMs())))));
+ config.setPipeSinkRetryLocallyForConnectionError(
+ Boolean.parseBoolean(
+ Optional.ofNullable(
+
properties.getProperty("pipe_sink_retry_locally_for_connection_error"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_retry_locally_for_connection_error",
+
String.valueOf(config.isPipeSinkRetryLocallyForConnectionError())))));
+ config.setPipeSinkRPCThriftCompressionEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
.orElse(
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
-
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
Long.parseLong(
Optional.ofNullable(
@@ -387,8 +394,7 @@ public class PipeDescriptor {
.orElse(
properties.getProperty(
"pipe_async_connector_max_retry_execution_time_ms_per_call",
- String.valueOf(
-
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
+
String.valueOf(config.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall())))));
config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
@@ -425,7 +431,7 @@ public class PipeDescriptor {
Integer.parseInt(
properties.getProperty(
"pipe_connector_request_slice_threshold_bytes",
-
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
+
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
Long.parseLong(
@@ -576,7 +582,7 @@ public class PipeDescriptor {
parserPipeConfig(
properties, "pipe_sink_timeout_ms", "pipe_connector_timeout_ms",
isHotModify);
if (value != null) {
- config.setPipeConnectorTransferTimeoutMs(Long.parseLong(value));
+ config.setPipeSinkTransferTimeoutMs(Long.parseLong(value));
}
value =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
index 118f2f26876..917d4a74ed9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
@@ -56,7 +56,7 @@ public abstract class IoTDBClientManager {
private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; //
1 day
private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000;
// 6 hours
protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
- new
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+ new
AtomicInteger(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
protected IoTDBClientManager(
List<TEndPoint> endPointList,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index a0bb65b5a7e..b7f42295e6c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -100,7 +100,7 @@ public class IoTDBSyncClient extends
IClientRPCService.Client
@Override
public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws
TException {
- final int bodySizeLimit =
PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes();
+ final int bodySizeLimit =
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
|| req.body.limit() < bodySizeLimit) {
return super.pipeTransfer(req);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index fa5f0d3383c..969f42bfcec 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -195,7 +195,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
- PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+ PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
index 9a6aba5b90b..e497656eab2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
@@ -28,6 +28,6 @@ public class GlobalRPCRateLimiter extends GlobalRateLimiter {
@Override
protected double getThroughputBytesPerSecond() {
- return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+ return CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index dda8818e04f..763bfa644ea 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -238,7 +238,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
} else {
supportModsIfIsDataNodeReceiver = true;
}
- socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+ socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
LOGGER.info("Handshake success. Socket: {}", socket);
}
@@ -265,7 +265,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
final AirGapSocket socket,
final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 0668f7cc99b..66f3eda6a1a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -378,7 +378,7 @@ public abstract class IoTDBSink implements PipeConnector {
nodeUrls.clear();
nodeUrls.addAll(parseNodeUrls(parameters));
- LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
+ LOGGER.info("IoTDBSink nodeUrls: {}", nodeUrls);
isTabletBatchModeEnabled =
parameters.getBooleanOrDefault(
@@ -390,7 +390,7 @@ public abstract class IoTDBSink implements PipeConnector {
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
- LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
+ LOGGER.info("IoTDBSink isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
final boolean shouldMarkAsGeneralWriteRequest =
parameters.getBooleanOrDefault(
@@ -406,7 +406,7 @@ public abstract class IoTDBSink implements PipeConnector {
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY,
SINK_MARK_AS_PIPE_REQUEST_KEY),
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
}
- LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
+ LOGGER.info("IoTDBSink shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
final String connectorSkipIfValue =
parameters
@@ -425,7 +425,7 @@ public abstract class IoTDBSink implements PipeConnector {
throw new PipeParameterNotValidException(
String.format("Parameters in set %s are not allowed in 'skipif'",
skipIfOptionSet));
}
- LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges);
+ LOGGER.info("IoTDBSink skipIfNoPrivileges: {}", skipIfNoPrivileges);
receiverStatusHandler =
new PipeReceiverStatusHandler(
@@ -465,7 +465,7 @@ public abstract class IoTDBSink implements PipeConnector {
SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
LOGGER.info(
- "IoTDBConnector {} = {}",
+ "IoTDBSink {} = {}",
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
shouldReceiverConvertOnTypeMismatch);
isRealtimeFirst =
@@ -475,7 +475,7 @@ public abstract class IoTDBSink implements PipeConnector {
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
LOGGER.info(
- "IoTDBConnector {} = {}",
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst);
+ "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
isRealtimeFirst);
}
protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters
parameters)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
index 080efd25846..96b7346d025 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
@@ -175,7 +175,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {