This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1b0a9d76883 Pipe: Enabled locally retry for PipeConnectionException
(#17182)
1b0a9d76883 is described below
commit 1b0a9d7688362fddd80c03cf822e19983cefeea7
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 14:28:17 2026 +0800
Pipe: Enabled locally retry for PipeConnectionException (#17182)
* fix
* may-fix
* fix
* fix
* try-complete
* fix-part
* Update IoTDBDataRegionAsyncSink.java
* fix
* fix
---
.../iotdb/tool/tsfile/ImportTsFileRemotely.java | 9 +-
.../exchange/sender/TwoStageAggregateSender.java | 5 +-
.../protocol/airgap/IoTDBAirGapReceiver.java | 2 +-
.../client/IoTDBDataNodeAsyncClientManager.java | 4 +-
.../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 2 +-
.../pipeconsensus/PipeConsensusSyncSink.java | 2 +-
.../PipeConsensusTsFileInsertionEventHandler.java | 2 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 37 +++---
.../PipeTransferTabletBatchEventHandler.java | 2 +-
.../PipeTransferTabletInsertionEventHandler.java | 2 +-
.../async/handler/PipeTransferTsFileHandler.java | 8 +-
.../GeneralRegionAttributeSecurityService.java | 4 +-
.../iotdb/commons/client/ClientPoolFactory.java | 20 ++--
.../apache/iotdb/commons/conf/CommonConfig.java | 129 ++++++++++++---------
.../task/subtask/PipeAbstractSinkSubtask.java | 20 +++-
.../agent/task/subtask/PipeReportableSubtask.java | 23 ++--
.../iotdb/commons/pipe/config/PipeConfig.java | 95 ++++++++-------
.../iotdb/commons/pipe/config/PipeDescriptor.java | 36 +++---
.../pipe/sink/client/IoTDBClientManager.java | 2 +-
.../commons/pipe/sink/client/IoTDBSyncClient.java | 2 +-
.../pipe/sink/client/IoTDBSyncClientManager.java | 4 +-
.../pipe/sink/limiter/GlobalRPCRateLimiter.java | 2 +-
.../pipe/sink/protocol/IoTDBAirGapSink.java | 4 +-
.../commons/pipe/sink/protocol/IoTDBSink.java | 12 +-
.../pipe/sink/protocol/IoTDBSslSyncSink.java | 2 +-
25 files changed, 230 insertions(+), 200 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index efb7f2abdbf..aca33ba4dbf 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -164,7 +164,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
"Handshake error with target server ip: %s, port: %s, because:
%s.",
client.getIpAddress(), client.getPort(), resp.getStatus()));
} else {
-
client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
IOT_PRINTER.println(
String.format(
"Handshake success. Target server ip: %s, port: %s",
@@ -232,7 +232,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
private void transferFilePieces(final File file, final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
@@ -299,10 +299,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase
{
this.client =
new IoTDBSyncClient(
new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs(
-
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
+
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
-
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
+
PipeConfig.getInstance().isPipeSinkRPCThriftCompressionEnabled())
.build(),
getEndPoint().getIp(),
getEndPoint().getPort(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index bac357368c0..3c36559a300 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,9 +208,8 @@ public class TwoStageAggregateSender implements
AutoCloseable {
private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws
TTransportException {
return new IoTDBSyncClient(
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
- .setRpcThriftCompressionEnabled(
- PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
+
.setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 3cea6c998f8..610b9e5fe1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -70,7 +70,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
@Override
public void runMayThrow() throws Throwable {
-
socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
socket.setSoTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
socket.setKeepAlive(true);
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId,
socket);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5ee7f22bc07..73e2213eea6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -312,7 +312,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF,
Boolean.toString(skipIfNoPrivileges));
-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
waitHandshakeFinished(isHandshakeFinished);
@@ -331,7 +331,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
resp.set(null);
exception.set(null);
-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
client.pipeTransfer(
PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 332ca6bab7c..e296a7e0faa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -389,7 +389,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
long position = 0;
// Try small piece to rebase the file position.
- final byte[] buffer = new
byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()];
+ final byte[] buffer = new
byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()];
try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"r")) {
while (true) {
final int dataLength = randomAccessFile.read(buffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
index bb0dcf2dc09..50972ee4454 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
@@ -446,7 +446,7 @@ public class PipeConsensusSyncSink extends IoTDBSink {
final TCommitId tCommitId,
final TConsensusGroupId tConsensusGroupId)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 485096173c1..630262d4485 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -104,7 +104,7 @@ public class PipeConsensusTsFileInsertionEventHandler
transferMod = event.isWithMod();
currentFile = transferMod ? modFile : tsFile;
- readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
readBuffer = new byte[readFileBufferSize];
position = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index e6e368a5280..f8d0b104096 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.audit.UserEntity;
+import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -59,6 +60,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -127,6 +129,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
new ConcurrentHashMap<>();
private boolean enableSendTsFileLimit;
+ private volatile boolean isConnectionException;
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -261,10 +264,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
false,
sealedFile.left));
}
- } catch (final Throwable t) {
- LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, t);
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, e);
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- addFailureEventsToRetryQueue(events);
+ addFailureEventsToRetryQueue(events, e);
}
}
} else {
@@ -599,7 +602,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
// Stop retrying if the execution time exceeds the threshold for better
realtime performance
if (System.currentTimeMillis() - retryStartTime
- >
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())
{
+ >
PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) {
if (retryEventQueueEventCounter.getTabletInsertionEventCount()
<
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
@@ -610,14 +613,17 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
}
if (remainingEvents <= retryEventQueue.size() +
retryTsFileQueue.size()) {
- throw new PipeException(
+ final String message =
"Failed to retry transferring events in the retry queue.
Remaining events: "
+ (retryEventQueue.size() + retryTsFileQueue.size())
+ " (tablet events: "
+ retryEventQueueEventCounter.getTabletInsertionEventCount()
+ ", tsfile events: "
+ retryEventQueueEventCounter.getTsFileInsertionEventCount()
- + ").");
+ + ").";
+ throw isConnectionException
+ ? new PipeConnectionException(message)
+ : new PipeException(message);
}
}
}
@@ -633,7 +639,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
}
} catch (final Exception e) {
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, e);
}
return;
}
@@ -646,14 +652,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
}
} else {
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, null);
}
} catch (final Exception e) {
if (tabletInsertionEvent instanceof EnrichedEvent) {
((EnrichedEvent) tabletInsertionEvent)
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
false);
}
- addFailureEventToRetryQueue(tabletInsertionEvent);
+ addFailureEventToRetryQueue(tabletInsertionEvent, e);
}
}
@@ -663,11 +669,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
tsFileInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionAsyncSink.class.getName(), false);
} else {
- addFailureEventToRetryQueue(tsFileInsertionEvent);
+ addFailureEventToRetryQueue(tsFileInsertionEvent, null);
}
} catch (final Exception e) {
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
false);
- addFailureEventToRetryQueue(tsFileInsertionEvent);
+ addFailureEventToRetryQueue(tsFileInsertionEvent, e);
}
}
@@ -677,7 +683,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
* @param event {@link Event} to retry
*/
@SuppressWarnings("java:S899")
- public void addFailureEventToRetryQueue(final Event event) {
+ public void addFailureEventToRetryQueue(final Event event, final Exception
e) {
+ isConnectionException =
+ e instanceof PipeConnectionException ||
ThriftClient.isConnectionBroken(e);
if (event instanceof EnrichedEvent && ((EnrichedEvent)
event).isReleased()) {
return;
}
@@ -713,8 +721,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
*
* @param events {@link EnrichedEvent}s to retry
*/
- public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent>
events) {
- events.forEach(this::addFailureEventToRetryQueue);
+ public void addFailureEventsToRetryQueue(
+ final Iterable<EnrichedEvent> events, final Exception e) {
+ events.forEach(event -> addFailureEventToRetryQueue(event, e));
}
public boolean isEnableSendTsFileLimit() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 89baaa02794..d95513a2228 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -123,7 +123,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
events.size(),
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
} finally {
- connector.addFailureEventsToRetryQueue(events);
+ connector.addFailureEventsToRetryQueue(events, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index df26325a5ca..ca92af92b19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -98,7 +98,7 @@ public abstract class PipeTransferTabletInsertionEventHandler
extends PipeTransf
event.getCommitterKey(),
event.getCommitId());
} finally {
- connector.addFailureEventToRetryQueue(event);
+ connector.addFailureEventToRetryQueue(event, exception);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3ef46034554..6726c9a6701 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -126,7 +126,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
readFileBufferSize =
(int)
Math.min(
- PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(),
+ PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
transferMod ? Math.max(tsFile.length(), modFile.length()) :
tsFile.length());
position = 0;
@@ -146,7 +146,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
memoryBlock =
PipeDataNodeResourceManager.memory()
.forceAllocateForTsFileWithRetry(
-
PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()
+
PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()
? readFileBufferSize
: 0);
readBuffer = new byte[readFileBufferSize];
@@ -415,7 +415,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
returnClientIfNecessary();
} finally {
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
- connector.addFailureEventsToRetryQueue(events);
+ connector.addFailureEventsToRetryQueue(events, exception);
}
}
}
@@ -477,7 +477,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
* @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
*/
private void waitForResourceEnough4Slicing(final long timeoutMs) throws
InterruptedException {
- if
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled())
{
+ if
(!PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
index 1bff56500ea..408cc121590 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
@@ -102,9 +102,7 @@ public class GeneralRegionAttributeSecurityService extends
AbstractPeriodicalSer
// UpdateClearContainer and version / TEndPoint are not calculated
final AtomicInteger limit =
new AtomicInteger(
- CommonDescriptor.getInstance()
- .getConfig()
- .getPipeConnectorRequestSliceThresholdBytes());
+
CommonDescriptor.getInstance().getConfig().getPipeSinkRequestSliceThresholdBytes());
final AtomicBoolean hasRemaining = new AtomicBoolean(false);
final Map<SchemaRegionId, Pair<Long, Map<TDataNodeLocation, byte[]>>>
attributeUpdateCommitMap =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 7ed34359b21..9e41b585ec4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -281,15 +281,13 @@ public class ClientPoolFactory {
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
- .setRpcThriftCompressionEnabled(
- conf.isPipeConnectorRPCThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(
- conf.getPipeAsyncConnectorSelectorNumber())
+
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.build(),
ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
+
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
@@ -308,16 +306,14 @@ public class ClientPoolFactory {
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
- .setRpcThriftCompressionEnabled(
- conf.isPipeConnectorRPCThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(
- conf.getPipeAsyncConnectorSelectorNumber())
+
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
.build(),
ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9990c6a958b..0e44d0ab1a2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -268,12 +268,13 @@ public class CommonConfig {
private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
private long pipeSourceMatcherCacheSize = 1024;
- private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
- private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
- private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
- private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
- private long pipeConnectorRetryIntervalMs = 1000L;
- private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+ private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+ private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
+ private int pipeSinkReadFileBufferSize = 5242880; // 5MB
+ private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
+ private long pipeSinkRetryIntervalMs = 800L;
+ private boolean pipeSinkRetryLocallyForConnectionError = true;
+ private boolean pipeSinkRPCThriftCompressionEnabled = false;
private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
@@ -1052,88 +1053,85 @@ public class CommonConfig {
logger.info("pipeSourceMatcherCacheSize is set to {}.",
pipeSourceMatcherCacheSize);
}
- public int getPipeConnectorHandshakeTimeoutMs() {
- return pipeConnectorHandshakeTimeoutMs;
+ public int getPipeSinkHandshakeTimeoutMs() {
+ return pipeSinkHandshakeTimeoutMs;
}
- public void setPipeConnectorHandshakeTimeoutMs(long
pipeConnectorHandshakeTimeoutMs) {
- final int fPipeConnectorHandshakeTimeoutMs =
this.pipeConnectorHandshakeTimeoutMs;
+ public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
+ final int fPipeConnectorHandshakeTimeoutMs =
this.pipeSinkHandshakeTimeoutMs;
try {
- this.pipeConnectorHandshakeTimeoutMs =
Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+ this.pipeSinkHandshakeTimeoutMs =
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
} catch (ArithmeticException e) {
- this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+ this.pipeSinkHandshakeTimeoutMs = Integer.MAX_VALUE;
logger.warn(
"Given pipe connector handshake timeout is too large, set to {}
ms.", Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorHandshakeTimeoutMs !=
this.pipeConnectorHandshakeTimeoutMs) {
+ if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs)
{
logger.info(
- "pipeConnectorHandshakeTimeoutMs is set to {}.",
this.pipeConnectorHandshakeTimeoutMs);
+ "pipeConnectorHandshakeTimeoutMs is set to {}.",
this.pipeSinkHandshakeTimeoutMs);
}
}
}
- public int getPipeConnectorTransferTimeoutMs() {
- return pipeConnectorTransferTimeoutMs;
+ public int getPipeSinkTransferTimeoutMs() {
+ return pipeSinkTransferTimeoutMs;
}
- public void setPipeConnectorTransferTimeoutMs(long
pipeConnectorTransferTimeoutMs) {
- final int fPipeConnectorTransferTimeoutMs =
this.pipeConnectorTransferTimeoutMs;
+ public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
+ final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
try {
- this.pipeConnectorTransferTimeoutMs =
Math.toIntExact(pipeConnectorTransferTimeoutMs);
+ this.pipeSinkTransferTimeoutMs =
Math.toIntExact(pipeSinkTransferTimeoutMs);
} catch (ArithmeticException e) {
- this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+ this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
logger.warn(
"Given pipe connector transfer timeout is too large, set to {} ms.",
Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorTransferTimeoutMs !=
this.pipeConnectorTransferTimeoutMs) {
- logger.info("pipeConnectorTransferTimeoutMs is set to {}.",
pipeConnectorTransferTimeoutMs);
+ if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+ logger.info("pipeConnectorTransferTimeoutMs is set to {}.",
pipeSinkTransferTimeoutMs);
}
}
}
- public int getPipeConnectorReadFileBufferSize() {
- return pipeConnectorReadFileBufferSize;
+ public int getPipeSinkReadFileBufferSize() {
+ return pipeSinkReadFileBufferSize;
}
- public void setPipeConnectorReadFileBufferSize(int
pipeConnectorReadFileBufferSize) {
- if (this.pipeConnectorReadFileBufferSize ==
pipeConnectorReadFileBufferSize) {
+ public void setPipeSinkReadFileBufferSize(int pipeSinkReadFileBufferSize) {
+ if (this.pipeSinkReadFileBufferSize == pipeSinkReadFileBufferSize) {
return;
}
- this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
- logger.info("pipeConnectorReadFileBufferSize is set to {}.",
pipeConnectorReadFileBufferSize);
+ this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
+ logger.info("pipeConnectorReadFileBufferSize is set to {}.",
pipeSinkReadFileBufferSize);
}
- public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
- return isPipeConnectorReadFileBufferMemoryControlEnabled;
+ public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+ return isPipeSinkReadFileBufferMemoryControlEnabled;
}
- public void setIsPipeConnectorReadFileBufferMemoryControlEnabled(
- boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
- if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
- == isPipeConnectorReadFileBufferMemoryControlEnabled) {
+ public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
+ boolean isPipeSinkReadFileBufferMemoryControlEnabled) {
+ if (this.isPipeSinkReadFileBufferMemoryControlEnabled
+ == isPipeSinkReadFileBufferMemoryControlEnabled) {
return;
}
- this.isPipeConnectorReadFileBufferMemoryControlEnabled =
- isPipeConnectorReadFileBufferMemoryControlEnabled;
+ this.isPipeSinkReadFileBufferMemoryControlEnabled =
+ isPipeSinkReadFileBufferMemoryControlEnabled;
logger.info(
- "isPipeConnectorReadFileBufferMemoryControlEnabled is set to {}.",
- isPipeConnectorReadFileBufferMemoryControlEnabled);
+ "isPipeSinkReadFileBufferMemoryControlEnabled is set to {}.",
+ isPipeSinkReadFileBufferMemoryControlEnabled);
}
- public void setPipeConnectorRPCThriftCompressionEnabled(
- boolean pipeConnectorRPCThriftCompressionEnabled) {
- if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
- == pipeConnectorRPCThriftCompressionEnabled) {
+ public void setPipeSinkRPCThriftCompressionEnabled(boolean
pipeSinkRPCThriftCompressionEnabled) {
+ if (this.isPipeSinkReadFileBufferMemoryControlEnabled ==
pipeSinkRPCThriftCompressionEnabled) {
return;
}
- this.pipeConnectorRPCThriftCompressionEnabled =
pipeConnectorRPCThriftCompressionEnabled;
+ this.pipeSinkRPCThriftCompressionEnabled =
pipeSinkRPCThriftCompressionEnabled;
logger.info(
- "pipeConnectorRPCThriftCompressionEnabled is set to {}.",
- pipeConnectorRPCThriftCompressionEnabled);
+ "pipeSinkRPCThriftCompressionEnabled is set to {}.",
pipeSinkRPCThriftCompressionEnabled);
}
- public boolean isPipeConnectorRPCThriftCompressionEnabled() {
- return pipeConnectorRPCThriftCompressionEnabled;
+ public boolean isPipeSinkRPCThriftCompressionEnabled() {
+ return pipeSinkRPCThriftCompressionEnabled;
}
public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
@@ -1199,11 +1197,11 @@ public class CommonConfig {
pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
}
- public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+ public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
}
- public int getPipeAsyncConnectorSelectorNumber() {
+ public int getPipeAsyncSinkSelectorNumber() {
return pipeAsyncConnectorSelectorNumber;
}
@@ -1221,7 +1219,7 @@ public class CommonConfig {
logger.info("pipeAsyncConnectorSelectorNumber is set to {}.",
pipeAsyncConnectorSelectorNumber);
}
- public int getPipeAsyncConnectorMaxClientNumber() {
+ public int getPipeAsyncSinkMaxClientNumber() {
return pipeAsyncConnectorMaxClientNumber;
}
@@ -1240,7 +1238,7 @@ public class CommonConfig {
"pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxClientNumber);
}
- public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+ public int getPipeAsyncSinkMaxTsFileClientNumber() {
return pipeAsyncConnectorMaxTsFileClientNumber;
}
@@ -1357,16 +1355,31 @@ public class CommonConfig {
logger.info("pipeAutoRestartEnabled is set to {}.",
pipeAutoRestartEnabled);
}
- public long getPipeConnectorRetryIntervalMs() {
- return pipeConnectorRetryIntervalMs;
+ public long getPipeSinkRetryIntervalMs() {
+ return pipeSinkRetryIntervalMs;
}
- public void setPipeConnectorRetryIntervalMs(long
pipeConnectorRetryIntervalMs) {
- if (this.pipeConnectorRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+ public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
+ if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
return;
}
- this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
- logger.info("pipeConnectorRetryIntervalMs is set to {}",
pipeConnectorRetryIntervalMs);
+ this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
+ logger.info("pipeSinkRetryIntervalMs is set to {}",
pipeConnectorRetryIntervalMs);
+ }
+
+ public boolean isPipeSinkRetryLocallyForConnectionError() {
+ return pipeSinkRetryLocallyForConnectionError;
+ }
+
+ public void setPipeSinkRetryLocallyForConnectionError(
+ boolean pipeSinkRetryLocallyForConnectionError) {
+ if (this.pipeSinkRetryLocallyForConnectionError ==
pipeSinkRetryLocallyForConnectionError) {
+ return;
+ }
+ this.pipeSinkRetryLocallyForConnectionError =
pipeSinkRetryLocallyForConnectionError;
+ logger.info(
+ "pipeSinkRetryLocallyForConnectionError is set to {}",
+ pipeSinkRetryLocallyForConnectionError);
}
public int
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
@@ -2101,7 +2114,7 @@ public class CommonConfig {
"rateLimiterHotReloadCheckIntervalMs is set to {}",
rateLimiterHotReloadCheckIntervalMs);
}
- public int getPipeConnectorRequestSliceThresholdBytes() {
+ public int getPipeSinkRequestSliceThresholdBytes() {
return pipeConnectorRequestSliceThresholdBytes;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 5f7156f6324..ccf16bfa753 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -134,6 +134,12 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
// return if the pipe task should be stopped
return;
}
+ if
(PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
+ super.onFailure(
+ new PipeRuntimeSinkNonReportTimeConfigurableException(
+ throwable.getMessage(), Long.MAX_VALUE));
+ return;
+ }
}
// Handle exceptions if any available clients exist
@@ -144,9 +150,10 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
super.onFailure(throwable);
} else {
// Print stack trace for better debugging
- LOGGER.warn(
- "A non PipeRuntimeConnectorCriticalException occurred, will throw
a PipeRuntimeConnectorCriticalException.",
- throwable);
+ PipeLogger.log(
+ LOGGER::warn,
+ throwable,
+ "A non PipeRuntimeSinkCriticalException occurred, will throw a
PipeRuntimeSinkCriticalException.");
super.onFailure(new
PipeRuntimeSinkCriticalException(throwable.getMessage()));
}
}
@@ -179,8 +186,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
MAX_RETRY_TIMES,
e);
try {
- sleepIfNoHighPriorityTask(
- retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+ sleepIfNoHighPriorityTask(retry *
PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
} catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to handshake with the
target system.",
@@ -192,7 +198,9 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
// Stop current pipe task directly if failed to reconnect to
// the target system after MAX_RETRY_TIMES times
- if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
+ if (retry == MAX_RETRY_TIMES
+ && lastEvent instanceof EnrichedEvent
+ &&
!PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
report(
(EnrichedEvent) lastEvent,
new PipeRuntimeSinkCriticalException(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index f290f8c4965..3c1a63064af 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask;
import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,10 +51,11 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
return;
}
- if (lastEvent instanceof EnrichedEvent) {
- onEnrichedEventFailure(throwable);
+ if (lastEvent instanceof EnrichedEvent
+ && !(throwable instanceof
PipeRuntimeSinkNonReportTimeConfigurableException)) {
+ onReportEventFailure(throwable);
} else {
- onNonEnrichedEventFailure(throwable);
+ onNonReportEventFailure(throwable);
}
// Although the pipe task will be stopped, we still don't release the last
event here
@@ -75,7 +78,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
return sleepInterval;
}
- private void onEnrichedEventFailure(final Throwable throwable) {
+ private void onReportEventFailure(final Throwable throwable) {
final int maxRetryTimes =
throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException
? ((PipeRuntimeSinkRetryTimesConfigurableException)
throwable).getRetryTimes()
@@ -94,15 +97,16 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
retryCount.incrementAndGet();
if (retryCount.get() <= maxRetryTimes) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
+ throwable,
"Retry executing subtask {} (creation time: {}, simple class: {}),
retry count [{}/{}], last exception: {}",
taskID,
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
maxRetryTimes,
- throwable.getMessage(),
- throwable);
+ throwable.getMessage());
try {
sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable));
} catch (final InterruptedException e) {
@@ -149,7 +153,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
protected abstract void report(final EnrichedEvent event, final
PipeRuntimeException exception);
- private void onNonEnrichedEventFailure(final Throwable throwable) {
+ private void onNonReportEventFailure(final Throwable throwable) {
if (retryCount.get() == 0) {
LOGGER.warn(
"Failed to execute subtask {} (creation time: {}, simple class: {}),
"
@@ -162,7 +166,8 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
}
retryCount.incrementAndGet();
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Retry executing subtask {} (creation time: {}, simple class: {}),
retry count {}, last exception: {}",
taskID,
creationTime,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 15ae89ee620..73cd8d8a4bf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -103,7 +103,7 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMinimumReceiverMemory();
}
- /////////////////////////////// Subtask Connector
///////////////////////////////
+ /////////////////////////////// Subtask Sink ///////////////////////////////
public int getPipeRealTimeQueuePollTsFileThreshold() {
return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
@@ -173,30 +173,34 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
}
- /////////////////////////////// Connector ///////////////////////////////
+ /////////////////////////////// Sink ///////////////////////////////
- public int getPipeConnectorHandshakeTimeoutMs() {
- return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
+ public int getPipeSinkHandshakeTimeoutMs() {
+ return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
}
- public int getPipeConnectorTransferTimeoutMs() {
- return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
+ public int getPipeSinkTransferTimeoutMs() {
+ return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
}
- public int getPipeConnectorReadFileBufferSize() {
- return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
+ public int getPipeSinkReadFileBufferSize() {
+ return COMMON_CONFIG.getPipeSinkReadFileBufferSize();
}
- public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
- return COMMON_CONFIG.isPipeConnectorReadFileBufferMemoryControlEnabled();
+ public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+ return COMMON_CONFIG.isPipeSinkReadFileBufferMemoryControlEnabled();
}
- public long getPipeConnectorRetryIntervalMs() {
- return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
+ public long getPipeSinkRetryIntervalMs() {
+ return COMMON_CONFIG.getPipeSinkRetryIntervalMs();
}
- public boolean isPipeConnectorRPCThriftCompressionEnabled() {
- return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
+ public boolean isPipeSinkRetryLocallyForConnectionError() {
+ return COMMON_CONFIG.isPipeSinkRetryLocallyForConnectionError();
+ }
+
+ public boolean isPipeSinkRPCThriftCompressionEnabled() {
+ return COMMON_CONFIG.isPipeSinkRPCThriftCompressionEnabled();
}
public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
@@ -211,27 +215,27 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
}
- public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+ public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall();
}
- public int getPipeAsyncConnectorSelectorNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
+ public int getPipeAsyncSinkSelectorNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkSelectorNumber();
}
- public int getPipeAsyncConnectorMaxClientNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
+ public int getPipeAsyncSinkMaxClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxClientNumber();
}
- public int getPipeAsyncConnectorMaxTsFileClientNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+ public int getPipeAsyncSinkMaxTsFileClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncSinkMaxTsFileClientNumber();
}
public double getPipeSendTsFileRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
}
- public double getPipeAllConnectorsRateLimitBytesPerSecond() {
+ public double getPipeAllSinksRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
@@ -239,8 +243,8 @@ public class PipeConfig {
return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
}
- public int getPipeConnectorRequestSliceThresholdBytes() {
- return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
+ public int getPipeSinkRequestSliceThresholdBytes() {
+ return COMMON_CONFIG.getPipeSinkRequestSliceThresholdBytes();
}
public long getPipeMaxReaderChunkSize() {
@@ -517,16 +521,14 @@ public class PipeConfig {
getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
LOGGER.info("PipeSourceMatcherCacheSize: {}",
getPipeSourceMatcherCacheSize());
- LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeConnectorHandshakeTimeoutMs());
- LOGGER.info("PipeConnectorTransferTimeoutMs: {}",
getPipeConnectorTransferTimeoutMs());
- LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
- LOGGER.info(
- "PipeConnectorReadFileBufferMemoryControlEnabled: {}",
- isPipeConnectorReadFileBufferMemoryControlEnabled());
- LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
+ LOGGER.info("PipeSinkHandshakeTimeoutMs: {}",
getPipeSinkHandshakeTimeoutMs());
+ LOGGER.info("PipeSinkTransferTimeoutMs: {}",
getPipeSinkTransferTimeoutMs());
+ LOGGER.info("PipeSinkReadFileBufferSize: {}",
getPipeSinkReadFileBufferSize());
LOGGER.info(
- "PipeConnectorRPCThriftCompressionEnabled: {}",
- isPipeConnectorRPCThriftCompressionEnabled());
+ "PipeSinkReadFileBufferMemoryControlEnabled: {}",
+ isPipeSinkReadFileBufferMemoryControlEnabled());
+ LOGGER.info("PipeSinkRetryIntervalMs: {}", getPipeSinkRetryIntervalMs());
+ LOGGER.info("PipeSinkRPCThriftCompressionEnabled: {}",
isPipeSinkRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
LOGGER.info("PipeMaxAlignedSeriesChunkSizeInOneBatch: {}",
getPipeMaxReaderChunkSize());
@@ -564,34 +566,29 @@ public class PipeConfig {
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
+ "PipeAsyncSinkForcedRetryTsFileEventQueueSizeThreshold: {}",
getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
+ "PipeAsyncSinkForcedRetryTabletEventQueueSizeThreshold: {}",
getPipeAsyncSinkForcedRetryTabletEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
+ "PipeAsyncSinkForcedRetryTotalEventQueueSizeThreshold: {}",
getPipeAsyncSinkForcedRetryTotalEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
- getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
- LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
- LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
- LOGGER.info(
- "PipeAsyncConnectorMaxTsFileClientNumber: {}",
- getPipeAsyncConnectorMaxTsFileClientNumber());
+ "PipeAsyncSinkMaxRetryExecutionTimeMsPerCall: {}",
+ getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall());
+ LOGGER.info("PipeAsyncSinkSelectorNumber: {}",
getPipeAsyncSinkSelectorNumber());
+ LOGGER.info("PipeAsyncSinkMaxClientNumber: {}",
getPipeAsyncSinkMaxClientNumber());
+ LOGGER.info("PipeAsyncSinkMaxTsFileClientNumber: {}",
getPipeAsyncSinkMaxTsFileClientNumber());
LOGGER.info(
"PipeSendTsFileRateLimitBytesPerSecond: {}",
getPipeSendTsFileRateLimitBytesPerSecond());
LOGGER.info(
- "PipeAllConnectorsRateLimitBytesPerSecond: {}",
- getPipeAllConnectorsRateLimitBytesPerSecond());
+ "PipeAllSinksRateLimitBytesPerSecond: {}",
getPipeAllSinksRateLimitBytesPerSecond());
LOGGER.info(
"RateLimiterHotReloadCheckIntervalMs: {}",
getRateLimiterHotReloadCheckIntervalMs());
- LOGGER.info(
- "PipeConnectorRequestSliceThresholdBytes: {}",
- getPipeConnectorRequestSliceThresholdBytes());
+ LOGGER.info("PipeSinkRequestSliceThresholdBytes: {}",
getPipeSinkRequestSliceThresholdBytes());
LOGGER.info("SeperatedPipeHeartbeatEnabled: {}",
isSeperatedPipeHeartbeatEnabled());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 05592809bbb..28c2465bb0f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -344,42 +344,49 @@ public class PipeDescriptor {
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
- config.setPipeConnectorHandshakeTimeoutMs(
+ config.setPipeSinkHandshakeTimeoutMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
.orElse(
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
-
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
- config.setPipeConnectorReadFileBufferSize(
+
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+ config.setPipeSinkReadFileBufferSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_size",
-
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
- config.setIsPipeConnectorReadFileBufferMemoryControlEnabled(
+
String.valueOf(config.getPipeSinkReadFileBufferSize())))));
+ config.setIsPipeSinkReadFileBufferMemoryControlEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_memory_control",
- String.valueOf(
-
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
- config.setPipeConnectorRetryIntervalMs(
+
String.valueOf(config.isPipeSinkReadFileBufferMemoryControlEnabled())))));
+ config.setPipeSinkRetryIntervalMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
.orElse(
properties.getProperty(
"pipe_connector_retry_interval_ms",
-
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
- config.setPipeConnectorRPCThriftCompressionEnabled(
+
String.valueOf(config.getPipeSinkRetryIntervalMs())))));
+ config.setPipeSinkRetryLocallyForConnectionError(
+ Boolean.parseBoolean(
+ Optional.ofNullable(
+
properties.getProperty("pipe_sink_retry_locally_for_connection_error"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_retry_locally_for_connection_error",
+
String.valueOf(config.isPipeSinkRetryLocallyForConnectionError())))));
+ config.setPipeSinkRPCThriftCompressionEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
.orElse(
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
-
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
Long.parseLong(
Optional.ofNullable(
@@ -387,8 +394,7 @@ public class PipeDescriptor {
.orElse(
properties.getProperty(
"pipe_async_connector_max_retry_execution_time_ms_per_call",
- String.valueOf(
-
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
+
String.valueOf(config.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall())))));
config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
@@ -425,7 +431,7 @@ public class PipeDescriptor {
Integer.parseInt(
properties.getProperty(
"pipe_connector_request_slice_threshold_bytes",
-
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
+
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
Long.parseLong(
@@ -575,7 +581,7 @@ public class PipeDescriptor {
parserPipeConfig(
properties, "pipe_sink_timeout_ms", "pipe_connector_timeout_ms",
isHotModify);
if (value != null) {
- config.setPipeConnectorTransferTimeoutMs(Long.parseLong(value));
+ config.setPipeSinkTransferTimeoutMs(Long.parseLong(value));
}
value =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
index 3e009b279d7..1f76f5d2453 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
@@ -58,7 +58,7 @@ public abstract class IoTDBClientManager {
private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; //
1 day
private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000;
// 6 hours
protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
- new
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+ new
AtomicInteger(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
protected IoTDBClientManager(
final List<TEndPoint> endPointList,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index a0bb65b5a7e..b7f42295e6c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -100,7 +100,7 @@ public class IoTDBSyncClient extends
IClientRPCService.Client
@Override
public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws
TException {
- final int bodySizeLimit =
PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes();
+ final int bodySizeLimit =
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
|| req.body.limit() < bodySizeLimit) {
return super.pipeTransfer(req);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 4c5d51f83c5..76c145d0dab 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -198,9 +198,9 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
clientAndStatus.setLeft(
new IoTDBSyncClient(
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
- PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+ PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
index 9a6aba5b90b..e497656eab2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
@@ -28,6 +28,6 @@ public class GlobalRPCRateLimiter extends GlobalRateLimiter {
@Override
protected double getThroughputBytesPerSecond() {
- return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+ return CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index aba6c7e1d82..7d84e3bb98f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -242,7 +242,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
} else {
supportModsIfIsDataNodeReceiver = true;
}
- socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+ socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
LOGGER.info("Handshake success. Socket: {}", socket);
}
@@ -269,7 +269,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
final AirGapSocket socket,
final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 9dda99c22d3..88a8b71775f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -398,7 +398,7 @@ public abstract class IoTDBSink implements PipeConnector {
nodeUrls.clear();
nodeUrls.addAll(parseNodeUrls(parameters));
- LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
+ LOGGER.info("IoTDBSink nodeUrls: {}", nodeUrls);
isTabletBatchModeEnabled =
parameters.getBooleanOrDefault(
@@ -410,7 +410,7 @@ public abstract class IoTDBSink implements PipeConnector {
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
- LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
+ LOGGER.info("IoTDBSink isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
final boolean shouldMarkAsGeneralWriteRequest =
parameters.getBooleanOrDefault(
@@ -426,7 +426,7 @@ public abstract class IoTDBSink implements PipeConnector {
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY,
SINK_MARK_AS_PIPE_REQUEST_KEY),
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
}
- LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
+ LOGGER.info("IoTDBSink shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
final String connectorSkipIfValue =
parameters
@@ -445,7 +445,7 @@ public abstract class IoTDBSink implements PipeConnector {
throw new PipeParameterNotValidException(
String.format("Parameters in set %s are not allowed in 'skipif'",
skipIfOptionSet));
}
- LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges);
+ LOGGER.info("IoTDBSink skipIfNoPrivileges: {}", skipIfNoPrivileges);
receiverStatusHandler =
new PipeReceiverStatusHandler(
@@ -485,7 +485,7 @@ public abstract class IoTDBSink implements PipeConnector {
SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
LOGGER.info(
- "IoTDBConnector {} = {}",
+ "IoTDBSink {} = {}",
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
shouldReceiverConvertOnTypeMismatch);
isRealtimeFirst =
@@ -495,7 +495,7 @@ public abstract class IoTDBSink implements PipeConnector {
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
LOGGER.info(
- "IoTDBConnector {} = {}",
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst);
+ "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
isRealtimeFirst);
}
protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters
parameters)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
index b6b8e52f1fa..75a4607a23b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
@@ -184,7 +184,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
final boolean isMultiFile)
throws PipeException, IOException {
- final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {