This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 75f35ff7f26 Pipe: Implement force forwarding feature where the Pipe 
sender can force the receiver to forward the received event (#14847) (#14875)
75f35ff7f26 is described below

commit 75f35ff7f263499c1961b1a71c0ece445cc114e6
Author: nanxiang xia <[email protected]>
AuthorDate: Tue Feb 18 10:48:39 2025 +0800

    Pipe: Implement force forwarding feature where the Pipe sender can force 
the receiver to forward the received event (#14847) (#14875)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../connector/client/IoTDBConfigNodeSyncClientManager.java   |  6 ++++--
 .../connector/protocol/IoTDBConfigRegionAirGapConnector.java |  3 +++
 .../pipe/connector/protocol/IoTDBConfigRegionConnector.java  |  6 ++++--
 .../connector/client/IoTDBDataNodeAsyncClientManager.java    |  9 +++++++--
 .../connector/client/IoTDBDataNodeSyncClientManager.java     |  6 ++++--
 .../protocol/airgap/IoTDBDataNodeAirGapConnector.java        |  3 +++
 .../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java |  3 ++-
 .../protocol/thrift/sync/IoTDBDataNodeSyncConnector.java     |  6 ++++--
 .../pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java |  2 +-
 .../commons/pipe/config/constant/PipeConnectorConstant.java  |  4 ++++
 .../commons/pipe/connector/client/IoTDBClientManager.java    |  6 +++++-
 .../pipe/connector/client/IoTDBSyncClientManager.java        |  9 +++++++--
 .../payload/thrift/common/PipeTransferHandshakeConstant.java |  1 +
 .../commons/pipe/connector/protocol/IoTDBConnector.java      | 12 +++++++++++-
 .../pipe/connector/protocol/IoTDBSslSyncConnector.java       |  6 ++++--
 .../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java       |  8 ++++++++
 16 files changed, 72 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index ab678f98864..989ddc5985a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -43,7 +43,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
       String loadBalanceStrategy,
       boolean shouldReceiverConvertOnTypeMismatch,
       String loadTsFileStrategy,
-      boolean validateTsFile) {
+      boolean validateTsFile,
+      boolean shouldMarkAsPipeRequest) {
     super(
         endPoints,
         username,
@@ -55,7 +56,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
         loadBalanceStrategy,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index de8b8fe5d98..c5fdc4765f9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -78,6 +78,9 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
         Boolean.toString(loadTsFileValidation));
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+        Boolean.toString(shouldMarkAsPipeRequest));
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index c2a327fb853..3761f696875 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -68,7 +68,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       final String loadBalanceStrategy,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     return new IoTDBConfigNodeSyncClientManager(
         nodeUrls,
         username,
@@ -79,7 +80,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
         loadBalanceStrategy,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 11ef6aec7f4..35de9369e31 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -85,7 +85,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       final String password,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     super(
         endPoints,
         username,
@@ -93,7 +94,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
         useLeaderCache,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
 
     endPointSet = new HashSet<>(endPoints);
 
@@ -253,6 +255,9 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
           Boolean.toString(validateTsFile));
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+          Boolean.toString(shouldMarkAsPipeRequest));
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index 62b8b563075..3c20e0f1891 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -53,7 +53,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       final String loadBalanceStrategy,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     super(
         endPoints,
         username,
@@ -65,7 +66,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
         loadBalanceStrategy,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 537d06bf767..23183e9e14f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -110,6 +110,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
         Boolean.toString(loadTsFileValidation));
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+        Boolean.toString(shouldMarkAsPipeRequest));
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 56c3f08d647..5981f9f7b7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -134,7 +134,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             password,
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
-            loadTsFileValidation);
+            loadTsFileValidation,
+            shouldMarkAsPipeRequest);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 5e8624397e9..89e63e44a79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -92,7 +92,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
       final String loadBalanceStrategy,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
             nodeUrls,
@@ -105,7 +106,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
             loadBalanceStrategy,
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
-            validateTsFile);
+            validateTsFile,
+            shouldMarkAsPipeRequest);
     return clientManager;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 2817e8644ae..946afcc8217 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -726,7 +726,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
     return Coordinator.getInstance()
         .executeForTreeModel(
-            new PipeEnrichedStatement(statement),
+            shouldMarkAsPipeRequest.get() ? new 
PipeEnrichedStatement(statement) : statement,
             SessionManager.getInstance().requestQueryId(),
             SESSION_MANAGER.getSessionInfo(clientSession),
             "",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 1f7b7870303..d886f313ef3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -243,6 +243,10 @@ public class PipeConnectorConstant {
   public static final String SINK_LOAD_TSFILE_VALIDATION_KEY = 
"sink.load-tsfile-validation";
   public static final boolean CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE = 
true;
 
+  public static final String CONNECTOR_MARK_AS_PIPE_REQUEST_KEY = 
"connector.mark-as-pipe-request";
+  public static final String SINK_MARK_AS_PIPE_REQUEST_KEY = 
"sink.mark-as-pipe-request";
+  public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE = 
true;
+
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index 18ef38ad47a..b1e0385deb0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -47,6 +47,8 @@ public abstract class IoTDBClientManager {
 
   protected final boolean useLeaderCache;
 
+  protected final boolean shouldMarkAsPipeRequest;
+
   // This flag indicates whether the receiver supports mods transferring if
   // it is a DataNode receiver. The flag is useless for configNode receiver.
   protected boolean supportModsIfIsDataNodeReceiver = true;
@@ -63,7 +65,8 @@ public abstract class IoTDBClientManager {
       boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
       boolean useLeaderCache,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     this.endPointList = endPointList;
     this.username = username;
     this.password = password;
@@ -71,6 +74,7 @@ public abstract class IoTDBClientManager {
     this.loadTsFileStrategy = loadTsFileStrategy;
     this.useLeaderCache = useLeaderCache;
     this.validateTsFile = validateTsFile;
+    this.shouldMarkAsPipeRequest = shouldMarkAsPipeRequest;
   }
 
   public boolean supportModsIfIsDataNodeReceiver() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 13ee064fafb..00fc35653d8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -72,7 +72,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       String loadBalanceStrategy,
       boolean shouldReceiverConvertOnTypeMismatch,
       String loadTsFileStrategy,
-      boolean validateTsFile) {
+      boolean validateTsFile,
+      boolean shouldMarkAsPipeRequest) {
     super(
         endPoints,
         username,
@@ -80,7 +81,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
         useLeaderCache,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
 
     this.useSSL = useSSL;
     this.trustStorePath = trustStorePath;
@@ -216,6 +218,9 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
           Boolean.toString(validateTsFile));
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+          Boolean.toString(shouldMarkAsPipeRequest));
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 4291427a9bc..1add2377cd9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -28,6 +28,7 @@ public class PipeTransferHandshakeConstant {
   public static final String HANDSHAKE_KEY_USERNAME = "username";
   public static final String HANDSHAKE_KEY_PASSWORD = "password";
   public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile";
+  public static final String HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST = 
"markAsPipeRequest";
 
   private PipeTransferHandshakeConstant() {
     // Utility class
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 22228f10140..9dc7bac704a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -93,6 +93,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
@@ -116,6 +118,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
 
 public abstract class IoTDBConnector implements PipeConnector {
@@ -137,6 +140,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   protected String loadTsFileStrategy;
   protected boolean loadTsFileValidation;
 
+  protected boolean shouldMarkAsPipeRequest;
+
   private boolean isRpcCompressionEnabled;
   private final List<PipeCompressor> compressors = new ArrayList<>();
 
@@ -342,9 +347,14 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
                 .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
-
     LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
 
+    shouldMarkAsPipeRequest =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, 
SINK_MARK_AS_PIPE_REQUEST_KEY),
+            CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
+    LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
+
     receiverStatusHandler =
         new PipeReceiverStatusHandler(
             parameters
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index e3a5dd21876..51e532d737d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -127,7 +127,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
             loadBalanceStrategy,
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
-            loadTsFileValidation);
+            loadTsFileValidation,
+            shouldMarkAsPipeRequest);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
@@ -141,7 +142,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       final String loadBalanceStrategy,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile);
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest);
 
   @Override
   public void handshake() throws Exception {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index ed4b85cfec6..60cff71fbfa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -86,6 +86,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new 
AtomicBoolean(false);
   protected final AtomicBoolean validateTsFile = new AtomicBoolean(true);
 
+  protected final AtomicBoolean shouldMarkAsPipeRequest = new 
AtomicBoolean(true);
+
   @Override
   public IoTDBConnectorRequestVersion getVersion() {
     return IoTDBConnectorRequestVersion.VERSION_1;
@@ -289,6 +291,12 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                 .getOrDefault(
                     
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, "true")));
 
+    shouldMarkAsPipeRequest.set(
+        Boolean.parseBoolean(
+            req.getParams()
+                .getOrDefault(
+                    
PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST, "true")));
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.

Reply via email to