This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 330a683242d Pipe: Implement force forwarding feature where the Pipe
sender can force the receiver to forward the received event (#14847)
330a683242d is described below
commit 330a683242d10013559a4e60cf7a85b6effcd0e7
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Feb 17 18:55:12 2025 +0800
Pipe: Implement force forwarding feature where the Pipe sender can force
the receiver to forward the received event (#14847)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../connector/client/IoTDBConfigNodeSyncClientManager.java | 6 ++++--
.../connector/protocol/IoTDBConfigRegionAirGapConnector.java | 3 +++
.../pipe/connector/protocol/IoTDBConfigRegionConnector.java | 6 ++++--
.../connector/client/IoTDBDataNodeAsyncClientManager.java | 9 +++++++--
.../connector/client/IoTDBDataNodeSyncClientManager.java | 6 ++++--
.../protocol/airgap/IoTDBDataNodeAirGapConnector.java | 3 +++
.../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java | 3 ++-
.../protocol/thrift/sync/IoTDBDataNodeSyncConnector.java | 6 ++++--
.../pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 8 ++++----
.../commons/pipe/config/constant/PipeConnectorConstant.java | 4 ++++
.../commons/pipe/connector/client/IoTDBClientManager.java | 6 +++++-
.../pipe/connector/client/IoTDBSyncClientManager.java | 9 +++++++--
.../payload/thrift/common/PipeTransferHandshakeConstant.java | 1 +
.../commons/pipe/connector/protocol/IoTDBConnector.java | 12 +++++++++++-
.../pipe/connector/protocol/IoTDBSslSyncConnector.java | 6 ++++--
.../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 8 ++++++++
16 files changed, 75 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index 33013ff6f39..a1bc26252b5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -45,7 +45,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
String password,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy,
- boolean validateTsFile) {
+ boolean validateTsFile,
+ boolean shouldMarkAsPipeRequest) {
super(
endPoints,
useSSL,
@@ -57,7 +58,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index ce74675dc44..ab3f2707544 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -82,6 +82,9 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(loadTsFileValidation));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+ Boolean.toString(shouldMarkAsPipeRequest));
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 34d99517d30..4453cf4e063 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -74,7 +74,8 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
return new IoTDBConfigNodeSyncClientManager(
nodeUrls,
useSSL,
@@ -85,7 +86,8 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index cd6e3e48c52..39a03169c30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -85,7 +85,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
super(
endPoints,
useLeaderCache,
@@ -93,7 +94,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
endPointSet = new HashSet<>(endPoints);
@@ -253,6 +255,9 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(validateTsFile));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+ Boolean.toString(shouldMarkAsPipeRequest));
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index e89268b2a23..39cbb7902d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -55,7 +55,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
super(
endPoints,
useSSL,
@@ -67,7 +68,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 3bb2348e75d..009d9666812 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -109,6 +109,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(loadTsFileValidation));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+ Boolean.toString(shouldMarkAsPipeRequest));
return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 242564ea9bb..e9759fe9d88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -138,7 +138,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- loadTsFileValidation);
+ loadTsFileValidation,
+ shouldMarkAsPipeRequest);
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 02c98f2958a..dbd96fd7d1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -88,7 +88,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
clientManager =
new IoTDBDataNodeSyncClientManager(
nodeUrls,
@@ -101,7 +102,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
return clientManager;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index d72cbc030dc..c63751b4325 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -915,7 +915,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return Coordinator.getInstance()
.executeForTableModel(
- new PipeEnrichedStatement(statement),
+ shouldMarkAsPipeRequest.get() ? new
PipeEnrichedStatement(statement) : statement,
relationalSqlParser,
SESSION_MANAGER.getCurrSession(),
SESSION_MANAGER.requestQueryId(),
@@ -939,7 +939,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
// Retry after creating the database
return Coordinator.getInstance()
.executeForTableModel(
- new PipeEnrichedStatement(statement),
+ shouldMarkAsPipeRequest.get() ? new
PipeEnrichedStatement(statement) : statement,
relationalSqlParser,
SESSION_MANAGER.getCurrSession(),
SESSION_MANAGER.requestQueryId(),
@@ -990,7 +990,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus executeStatementForTreeModel(final Statement statement) {
return Coordinator.getInstance()
.executeForTreeModel(
- new PipeEnrichedStatement(statement),
+ shouldMarkAsPipeRequest.get() ? new
PipeEnrichedStatement(statement) : statement,
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
@@ -1021,7 +1021,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final TSStatus result =
Coordinator.getInstance()
.executeForTableModel(
- new PipeEnriched(statement),
+ shouldMarkAsPipeRequest.get() ? new PipeEnriched(statement)
: statement,
relationalSqlParser,
SESSION_MANAGER.getCurrSession(),
SESSION_MANAGER.requestQueryId(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index c66e6c9c0c6..6af7b7b32ac 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -247,6 +247,10 @@ public class PipeConnectorConstant {
public static final String SINK_LOAD_TSFILE_VALIDATION_KEY =
"sink.load-tsfile-validation";
public static final boolean CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE =
true;
+ public static final String CONNECTOR_MARK_AS_PIPE_REQUEST_KEY =
"connector.mark-as-pipe-request";
+ public static final String SINK_MARK_AS_PIPE_REQUEST_KEY =
"sink.mark-as-pipe-request";
+ public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE =
true;
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index 2c4921712e4..91a65283a73 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -47,6 +47,8 @@ public abstract class IoTDBClientManager {
protected final boolean shouldReceiverConvertOnTypeMismatch;
protected final String loadTsFileStrategy;
+ protected final boolean shouldMarkAsPipeRequest;
+
// This flag indicates whether the receiver supports mods transferring if
// it is a DataNode receiver. The flag is useless for configNode receiver.
protected boolean supportModsIfIsDataNodeReceiver = true;
@@ -65,7 +67,8 @@ public abstract class IoTDBClientManager {
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile) {
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
this.endPointList = endPointList;
this.useLeaderCache = useLeaderCache;
@@ -75,6 +78,7 @@ public abstract class IoTDBClientManager {
this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
this.loadTsFileStrategy = loadTsFileStrategy;
this.validateTsFile = validateTsFile;
+ this.shouldMarkAsPipeRequest = shouldMarkAsPipeRequest;
}
public boolean supportModsIfIsDataNodeReceiver() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 2c9830212dc..17a48e1dd9d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -74,7 +74,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
String password,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy,
- boolean validateTsFile) {
+ boolean validateTsFile,
+ boolean shouldMarkAsPipeRequest) {
super(
endPoints,
useLeaderCache,
@@ -82,7 +83,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- validateTsFile);
+ validateTsFile,
+ shouldMarkAsPipeRequest);
this.useSSL = useSSL;
this.trustStorePath = trustStorePath;
@@ -218,6 +220,9 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.toString(validateTsFile));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+ Boolean.toString(shouldMarkAsPipeRequest));
// Try to handshake by PipeTransferHandshakeV2Req.
TPipeTransferResp resp =
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 4291427a9bc..1add2377cd9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -28,6 +28,7 @@ public class PipeTransferHandshakeConstant {
public static final String HANDSHAKE_KEY_USERNAME = "username";
public static final String HANDSHAKE_KEY_PASSWORD = "password";
public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile";
+ public static final String HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST =
"markAsPipeRequest";
private PipeTransferHandshakeConstant() {
// Utility class
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index e61f22ee9f5..aed4afd4544 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -95,6 +95,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
@@ -118,6 +120,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
@TreeModel
@@ -141,6 +144,8 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected String loadTsFileStrategy;
protected boolean loadTsFileValidation;
+ protected boolean shouldMarkAsPipeRequest;
+
private boolean isRpcCompressionEnabled;
private final List<PipeCompressor> compressors = new ArrayList<>();
@@ -346,9 +351,14 @@ public abstract class IoTDBConnector implements
PipeConnector {
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
-
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
+ shouldMarkAsPipeRequest =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY,
SINK_MARK_AS_PIPE_REQUEST_KEY),
+ CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
+ LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
+
receiverStatusHandler =
new PipeReceiverStatusHandler(
parameters
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 6909a3b1384..578ae675db4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -131,7 +131,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
- loadTsFileValidation);
+ loadTsFileValidation,
+ shouldMarkAsPipeRequest);
}
protected abstract IoTDBSyncClientManager constructClient(
@@ -147,7 +148,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
- final boolean validateTsFile);
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest);
@Override
public void handshake() throws Exception {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index a626b7e9f3b..1e0f6305f2d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -86,6 +86,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new
AtomicBoolean(false);
protected final AtomicBoolean validateTsFile = new AtomicBoolean(true);
+ protected final AtomicBoolean shouldMarkAsPipeRequest = new
AtomicBoolean(true);
+
@Override
public IoTDBConnectorRequestVersion getVersion() {
return IoTDBConnectorRequestVersion.VERSION_1;
@@ -289,6 +291,12 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
.getOrDefault(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, "true")));
+ shouldMarkAsPipeRequest.set(
+ Boolean.parseBoolean(
+ req.getParams()
+ .getOrDefault(
+
PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST, "true")));
+
// Handle the handshake request as a v1 request.
// Here we construct a fake "dataNode" request to valid from v1 validation
logic, though
// it may not require the actual type of the v1 request.