This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 75f35ff7f26 Pipe: Implement force forwarding feature where the Pipe
sender can force the receiver to forward the received event (#14847) (#14875)
75f35ff7f26 is described below
commit 75f35ff7f263499c1961b1a71c0ece445cc114e6
Author: nanxiang xia <[email protected]>
AuthorDate: Tue Feb 18 10:48:39 2025 +0800
Pipe: Implement force forwarding feature where the Pipe sender can force
the receiver to forward the received event (#14847) (#14875)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../connector/client/IoTDBConfigNodeSyncClientManager.java | 6 ++++--
.../connector/protocol/IoTDBConfigRegionAirGapConnector.java | 3 +++
.../pipe/connector/protocol/IoTDBConfigRegionConnector.java | 6 ++++--
.../connector/client/IoTDBDataNodeAsyncClientManager.java | 9 +++++++--
.../connector/client/IoTDBDataNodeSyncClientManager.java | 6 ++++--
.../protocol/airgap/IoTDBDataNodeAirGapConnector.java | 3 +++
.../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java | 3 ++-
.../protocol/thrift/sync/IoTDBDataNodeSyncConnector.java | 6 ++++--
.../pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 2 +-
.../commons/pipe/config/constant/PipeConnectorConstant.java | 4 ++++
.../commons/pipe/connector/client/IoTDBClientManager.java | 6 +++++-
.../pipe/connector/client/IoTDBSyncClientManager.java | 9 +++++++--
.../payload/thrift/common/PipeTransferHandshakeConstant.java | 1 +
.../commons/pipe/connector/protocol/IoTDBConnector.java | 12 +++++++++++-
.../pipe/connector/protocol/IoTDBSslSyncConnector.java | 6 ++++--
.../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 8 ++++++++
16 files changed, 72 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index ab678f98864..989ddc5985a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -43,7 +43,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
String loadBalanceStrategy,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy,
- boolean validateTsFile) {
+ boolean validateTsFile,
+ boolean shouldMarkAsPipeRequest) {
super(
endPoints,
username,
@@ -55,7 +56,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index de8b8fe5d98..c5fdc4765f9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -78,6 +78,9 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(loadTsFileValidation));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+ Boolean.toString(shouldMarkAsPipeRequest));
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index c2a327fb853..3761f696875 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -68,7 +68,8 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final String loadBalanceStrategy,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
return new IoTDBConfigNodeSyncClientManager(
nodeUrls,
username,
@@ -79,7 +80,8 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 11ef6aec7f4..35de9369e31 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -85,7 +85,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
super(
endPoints,
username,
@@ -93,7 +94,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
useLeaderCache,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
endPointSet = new HashSet<>(endPoints);
@@ -253,6 +255,9 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(validateTsFile));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+ Boolean.toString(shouldMarkAsPipeRequest));
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 62b8b563075..3c20e0f1891 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -53,7 +53,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
final String loadBalanceStrategy,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
super(
endPoints,
username,
@@ -65,7 +66,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 537d06bf767..23183e9e14f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -110,6 +110,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(loadTsFileValidation));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+ Boolean.toString(shouldMarkAsPipeRequest));
return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 56c3f08d647..5981f9f7b7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -134,7 +134,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- loadTsFileValidation);
+ loadTsFileValidation,
+ shouldMarkAsPipeRequest);
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 5e8624397e9..89e63e44a79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -92,7 +92,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
final String loadBalanceStrategy,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
clientManager =
new IoTDBDataNodeSyncClientManager(
nodeUrls,
@@ -105,7 +106,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
return clientManager;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 2817e8644ae..946afcc8217 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -726,7 +726,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return Coordinator.getInstance()
.executeForTreeModel(
- new PipeEnrichedStatement(statement),
+ shouldMarkAsPipeRequest.get() ? new
PipeEnrichedStatement(statement) : statement,
SessionManager.getInstance().requestQueryId(),
SESSION_MANAGER.getSessionInfo(clientSession),
"",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 1f7b7870303..d886f313ef3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -243,6 +243,10 @@ public class PipeConnectorConstant {
public static final String SINK_LOAD_TSFILE_VALIDATION_KEY =
"sink.load-tsfile-validation";
public static final boolean CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE =
true;
+ public static final String CONNECTOR_MARK_AS_PIPE_REQUEST_KEY =
"connector.mark-as-pipe-request";
+ public static final String SINK_MARK_AS_PIPE_REQUEST_KEY =
"sink.mark-as-pipe-request";
+ public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE =
true;
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index 18ef38ad47a..b1e0385deb0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -47,6 +47,8 @@ public abstract class IoTDBClientManager {
protected final boolean useLeaderCache;
+ protected final boolean shouldMarkAsPipeRequest;
+
// This flag indicates whether the receiver supports mods transferring if
// it is a DataNode receiver. The flag is useless for configNode receiver.
protected boolean supportModsIfIsDataNodeReceiver = true;
@@ -63,7 +65,8 @@ public abstract class IoTDBClientManager {
boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
boolean useLeaderCache,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
this.endPointList = endPointList;
this.username = username;
this.password = password;
@@ -71,6 +74,7 @@ public abstract class IoTDBClientManager {
this.loadTsFileStrategy = loadTsFileStrategy;
this.useLeaderCache = useLeaderCache;
this.validateTsFile = validateTsFile;
+ this.shouldMarkAsPipeRequest = shouldMarkAsPipeRequest;
}
public boolean supportModsIfIsDataNodeReceiver() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 13ee064fafb..00fc35653d8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -72,7 +72,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
String loadBalanceStrategy,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy,
- boolean validateTsFile) {
+ boolean validateTsFile,
+ boolean shouldMarkAsPipeRequest) {
super(
endPoints,
username,
@@ -80,7 +81,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
useLeaderCache,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
this.useSSL = useSSL;
this.trustStorePath = trustStorePath;
@@ -216,6 +218,9 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(validateTsFile));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+ Boolean.toString(shouldMarkAsPipeRequest));
// Try to handshake by PipeTransferHandshakeV2Req.
TPipeTransferResp resp =
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 4291427a9bc..1add2377cd9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -28,6 +28,7 @@ public class PipeTransferHandshakeConstant {
public static final String HANDSHAKE_KEY_USERNAME = "username";
public static final String HANDSHAKE_KEY_PASSWORD = "password";
public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile";
+ public static final String HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST =
"markAsPipeRequest";
private PipeTransferHandshakeConstant() {
// Utility class
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 22228f10140..9dc7bac704a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -93,6 +93,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
@@ -116,6 +118,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
public abstract class IoTDBConnector implements PipeConnector {
@@ -137,6 +140,8 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected String loadTsFileStrategy;
protected boolean loadTsFileValidation;
+ protected boolean shouldMarkAsPipeRequest;
+
private boolean isRpcCompressionEnabled;
private final List<PipeCompressor> compressors = new ArrayList<>();
@@ -342,9 +347,14 @@ public abstract class IoTDBConnector implements
PipeConnector {
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
-
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
+ shouldMarkAsPipeRequest =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY,
SINK_MARK_AS_PIPE_REQUEST_KEY),
+ CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
+ LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
+
receiverStatusHandler =
new PipeReceiverStatusHandler(
parameters
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index e3a5dd21876..51e532d737d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -127,7 +127,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
loadBalanceStrategy,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- loadTsFileValidation);
+ loadTsFileValidation,
+ shouldMarkAsPipeRequest);
}
protected abstract IoTDBSyncClientManager constructClient(
@@ -141,7 +142,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
final String loadBalanceStrategy,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile);
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest);
@Override
public void handshake() throws Exception {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index ed4b85cfec6..60cff71fbfa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -86,6 +86,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new
AtomicBoolean(false);
protected final AtomicBoolean validateTsFile = new AtomicBoolean(true);
+ protected final AtomicBoolean shouldMarkAsPipeRequest = new
AtomicBoolean(true);
+
@Override
public IoTDBConnectorRequestVersion getVersion() {
return IoTDBConnectorRequestVersion.VERSION_1;
@@ -289,6 +291,12 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
.getOrDefault(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, "true")));
+ shouldMarkAsPipeRequest.set(
+ Boolean.parseBoolean(
+ req.getParams()
+ .getOrDefault(
+
PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST, "true")));
+
// Handle the handshake request as a v1 request.
// Here we construct a fake "dataNode" request to valid from v1 validation
logic, though
// it may not require the actual type of the v1 request.