This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 330a683242d Pipe: Implement force forwarding feature where the Pipe 
sender can force the receiver to forward the received event (#14847)
330a683242d is described below

commit 330a683242d10013559a4e60cf7a85b6effcd0e7
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Feb 17 18:55:12 2025 +0800

    Pipe: Implement force forwarding feature where the Pipe sender can force 
the receiver to forward the received event (#14847)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../connector/client/IoTDBConfigNodeSyncClientManager.java   |  6 ++++--
 .../connector/protocol/IoTDBConfigRegionAirGapConnector.java |  3 +++
 .../pipe/connector/protocol/IoTDBConfigRegionConnector.java  |  6 ++++--
 .../connector/client/IoTDBDataNodeAsyncClientManager.java    |  9 +++++++--
 .../connector/client/IoTDBDataNodeSyncClientManager.java     |  6 ++++--
 .../protocol/airgap/IoTDBDataNodeAirGapConnector.java        |  3 +++
 .../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java |  3 ++-
 .../protocol/thrift/sync/IoTDBDataNodeSyncConnector.java     |  6 ++++--
 .../pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java |  8 ++++----
 .../commons/pipe/config/constant/PipeConnectorConstant.java  |  4 ++++
 .../commons/pipe/connector/client/IoTDBClientManager.java    |  6 +++++-
 .../pipe/connector/client/IoTDBSyncClientManager.java        |  9 +++++++--
 .../payload/thrift/common/PipeTransferHandshakeConstant.java |  1 +
 .../commons/pipe/connector/protocol/IoTDBConnector.java      | 12 +++++++++++-
 .../pipe/connector/protocol/IoTDBSslSyncConnector.java       |  6 ++++--
 .../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java       |  8 ++++++++
 16 files changed, 75 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index 33013ff6f39..a1bc26252b5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -45,7 +45,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
       String password,
       boolean shouldReceiverConvertOnTypeMismatch,
       String loadTsFileStrategy,
-      boolean validateTsFile) {
+      boolean validateTsFile,
+      boolean shouldMarkAsPipeRequest) {
     super(
         endPoints,
         useSSL,
@@ -57,7 +58,8 @@ public class IoTDBConfigNodeSyncClientManager extends 
IoTDBSyncClientManager {
         password,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index ce74675dc44..ab3f2707544 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -82,6 +82,9 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
         Boolean.toString(loadTsFileValidation));
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+        Boolean.toString(shouldMarkAsPipeRequest));
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 34d99517d30..4453cf4e063 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -74,7 +74,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
       final String password,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     return new IoTDBConfigNodeSyncClientManager(
         nodeUrls,
         useSSL,
@@ -85,7 +86,8 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
         password,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index cd6e3e48c52..39a03169c30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -85,7 +85,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       final String password,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     super(
         endPoints,
         useLeaderCache,
@@ -93,7 +94,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
         password,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
 
     endPointSet = new HashSet<>(endPoints);
 
@@ -253,6 +255,9 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
           Boolean.toString(validateTsFile));
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+          Boolean.toString(shouldMarkAsPipeRequest));
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index e89268b2a23..39cbb7902d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -55,7 +55,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
       final String password,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     super(
         endPoints,
         useSSL,
@@ -67,7 +68,8 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
         password,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 3bb2348e75d..009d9666812 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -109,6 +109,9 @@ public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
         Boolean.toString(loadTsFileValidation));
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+        Boolean.toString(shouldMarkAsPipeRequest));
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 242564ea9bb..e9759fe9d88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -138,7 +138,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             password,
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
-            loadTsFileValidation);
+            loadTsFileValidation,
+            shouldMarkAsPipeRequest);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 02c98f2958a..dbd96fd7d1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -88,7 +88,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
       final String password,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     clientManager =
         new IoTDBDataNodeSyncClientManager(
             nodeUrls,
@@ -101,7 +102,8 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
             password,
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
-            validateTsFile);
+            validateTsFile,
+            shouldMarkAsPipeRequest);
     return clientManager;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index d72cbc030dc..c63751b4325 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -915,7 +915,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
       return Coordinator.getInstance()
           .executeForTableModel(
-              new PipeEnrichedStatement(statement),
+              shouldMarkAsPipeRequest.get() ? new 
PipeEnrichedStatement(statement) : statement,
               relationalSqlParser,
               SESSION_MANAGER.getCurrSession(),
               SESSION_MANAGER.requestQueryId(),
@@ -939,7 +939,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         // Retry after creating the database
         return Coordinator.getInstance()
             .executeForTableModel(
-                new PipeEnrichedStatement(statement),
+                shouldMarkAsPipeRequest.get() ? new 
PipeEnrichedStatement(statement) : statement,
                 relationalSqlParser,
                 SESSION_MANAGER.getCurrSession(),
                 SESSION_MANAGER.requestQueryId(),
@@ -990,7 +990,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus executeStatementForTreeModel(final Statement statement) {
     return Coordinator.getInstance()
         .executeForTreeModel(
-            new PipeEnrichedStatement(statement),
+            shouldMarkAsPipeRequest.get() ? new 
PipeEnrichedStatement(statement) : statement,
             SESSION_MANAGER.requestQueryId(),
             SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
             "",
@@ -1021,7 +1021,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       final TSStatus result =
           Coordinator.getInstance()
               .executeForTableModel(
-                  new PipeEnriched(statement),
+                  shouldMarkAsPipeRequest.get() ? new PipeEnriched(statement) 
: statement,
                   relationalSqlParser,
                   SESSION_MANAGER.getCurrSession(),
                   SESSION_MANAGER.requestQueryId(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index c66e6c9c0c6..6af7b7b32ac 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -247,6 +247,10 @@ public class PipeConnectorConstant {
   public static final String SINK_LOAD_TSFILE_VALIDATION_KEY = 
"sink.load-tsfile-validation";
   public static final boolean CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE = 
true;
 
+  public static final String CONNECTOR_MARK_AS_PIPE_REQUEST_KEY = 
"connector.mark-as-pipe-request";
+  public static final String SINK_MARK_AS_PIPE_REQUEST_KEY = 
"sink.mark-as-pipe-request";
+  public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE = 
true;
+
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index 2c4921712e4..91a65283a73 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -47,6 +47,8 @@ public abstract class IoTDBClientManager {
   protected final boolean shouldReceiverConvertOnTypeMismatch;
   protected final String loadTsFileStrategy;
 
+  protected final boolean shouldMarkAsPipeRequest;
+
   // This flag indicates whether the receiver supports mods transferring if
   // it is a DataNode receiver. The flag is useless for configNode receiver.
   protected boolean supportModsIfIsDataNodeReceiver = true;
@@ -65,7 +67,8 @@ public abstract class IoTDBClientManager {
       final String password,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile) {
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest) {
     this.endPointList = endPointList;
 
     this.useLeaderCache = useLeaderCache;
@@ -75,6 +78,7 @@ public abstract class IoTDBClientManager {
     this.shouldReceiverConvertOnTypeMismatch = 
shouldReceiverConvertOnTypeMismatch;
     this.loadTsFileStrategy = loadTsFileStrategy;
     this.validateTsFile = validateTsFile;
+    this.shouldMarkAsPipeRequest = shouldMarkAsPipeRequest;
   }
 
   public boolean supportModsIfIsDataNodeReceiver() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 2c9830212dc..17a48e1dd9d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -74,7 +74,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       String password,
       boolean shouldReceiverConvertOnTypeMismatch,
       String loadTsFileStrategy,
-      boolean validateTsFile) {
+      boolean validateTsFile,
+      boolean shouldMarkAsPipeRequest) {
     super(
         endPoints,
         useLeaderCache,
@@ -82,7 +83,8 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
         password,
         shouldReceiverConvertOnTypeMismatch,
         loadTsFileStrategy,
-        validateTsFile);
+        validateTsFile,
+        shouldMarkAsPipeRequest);
 
     this.useSSL = useSSL;
     this.trustStorePath = trustStorePath;
@@ -218,6 +220,9 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
           Boolean.toString(validateTsFile));
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
+          Boolean.toString(shouldMarkAsPipeRequest));
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index 4291427a9bc..1add2377cd9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -28,6 +28,7 @@ public class PipeTransferHandshakeConstant {
   public static final String HANDSHAKE_KEY_USERNAME = "username";
   public static final String HANDSHAKE_KEY_PASSWORD = "password";
   public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile";
+  public static final String HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST = 
"markAsPipeRequest";
 
   private PipeTransferHandshakeConstant() {
     // Utility class
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index e61f22ee9f5..aed4afd4544 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -95,6 +95,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
@@ -118,6 +120,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
 
 @TreeModel
@@ -141,6 +144,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   protected String loadTsFileStrategy;
   protected boolean loadTsFileValidation;
 
+  protected boolean shouldMarkAsPipeRequest;
+
   private boolean isRpcCompressionEnabled;
   private final List<PipeCompressor> compressors = new ArrayList<>();
 
@@ -346,9 +351,14 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
                 .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
-
     LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
 
+    shouldMarkAsPipeRequest =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, 
SINK_MARK_AS_PIPE_REQUEST_KEY),
+            CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
+    LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
+
     receiverStatusHandler =
         new PipeReceiverStatusHandler(
             parameters
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 6909a3b1384..578ae675db4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -131,7 +131,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
             password,
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
-            loadTsFileValidation);
+            loadTsFileValidation,
+            shouldMarkAsPipeRequest);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
@@ -147,7 +148,8 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
       final String password,
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
-      final boolean validateTsFile);
+      final boolean validateTsFile,
+      final boolean shouldMarkAsPipeRequest);
 
   @Override
   public void handshake() throws Exception {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index a626b7e9f3b..1e0f6305f2d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -86,6 +86,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new 
AtomicBoolean(false);
   protected final AtomicBoolean validateTsFile = new AtomicBoolean(true);
 
+  protected final AtomicBoolean shouldMarkAsPipeRequest = new 
AtomicBoolean(true);
+
   @Override
   public IoTDBConnectorRequestVersion getVersion() {
     return IoTDBConnectorRequestVersion.VERSION_1;
@@ -289,6 +291,12 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                 .getOrDefault(
                     
PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, "true")));
 
+    shouldMarkAsPipeRequest.set(
+        Boolean.parseBoolean(
+            req.getParams()
+                .getOrDefault(
+                    
PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST, "true")));
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.

Reply via email to