This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c037e7df03 Pipe: Fixed the bug that mark-as-pipe-request may not be 
of use in configNode events (#16581)
4c037e7df03 is described below

commit 4c037e7df038e3b5a9a821c5571e0dbab06ff250
Author: Caideyipi <[email protected]>
AuthorDate: Wed Oct 15 11:00:58 2025 +0800

    Pipe: Fixed the bug that mark-as-pipe-request may not be of use in 
configNode events (#16581)
---
 .github/workflows/pipe-it.yml                      |  86 +++++++++++++++
 .../manual/AbstractPipeTableModelDualManualIT.java |   1 -
 .../manual/basic/IoTDBPipePermissionIT.java        |   1 -
 .../manual/basic/IoTDBPipeProtocolIT.java          |   1 -
 .../tablemodel/manual/basic/IoTDBPipeSourceIT.java |   1 -
 .../manual/basic/IoTDBPipeWithLoadIT.java          |   1 -
 .../manual/enhanced/IoTDBPipeAutoConflictIT.java   |   1 -
 .../auto/AbstractPipeDualTreeModelAutoIT.java      |   1 -
 .../treemodel/auto/basic/IoTDBPipeProcessorIT.java |   1 -
 .../treemodel/auto/basic/IoTDBPipeProtocolIT.java  |   1 -
 .../treemodel/auto/basic/IoTDBPipeSourceIT.java    |   1 -
 .../auto/enhanced/IoTDBPipeAutoConflictIT.java     |   1 -
 .../auto/enhanced/IoTDBPipeIdempotentIT.java       |   1 -
 .../auto/enhanced/IoTDBPipeWithLoadIT.java         |   1 -
 .../manual/AbstractPipeDualTreeModelManualIT.java  |   1 -
 .../manual/IoTDBPipeMetaHistoricalIT.java          |   1 -
 .../treemodel/manual/IoTDBPipePermissionIT.java    |   1 -
 .../AbstractPipeTripleManualIT.java}               |  57 +++++-----
 .../iotdb/pipe/it/triple/IoTDBPipeForwardIT.java   | 117 +++++++++++++++++++++
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  72 ++++++++-----
 20 files changed, 279 insertions(+), 69 deletions(-)

diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml
index 154cebf968c..9c2899f1fe8 100644
--- a/.github/workflows/pipe-it.yml
+++ b/.github/workflows/pipe-it.yml
@@ -890,3 +890,89 @@ jobs:
           name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java 
}}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
           path: integration-test/target/cluster-logs
           retention-days: 30
+  triple:
+    strategy:
+      fail-fast: false
+      max-parallel: 1
+      matrix:
+        java: [ 17 ]
+        cluster1: [ ScalableSingleNodeMode ]
+        cluster2: [ ScalableSingleNodeMode ]
+        cluster3: [ ScalableSingleNodeMode ]
+        os: [ ubuntu-latest ]
+    runs-on: ${{ matrix.os }}
+    steps:
+      - uses: actions/checkout@v4
+      - name: Set up JDK ${{ matrix.java }}
+        uses: actions/setup-java@v4
+        with:
+          distribution: oracle
+          java-version: ${{ matrix.java }}
+        env:
+          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+      - name: Cache Maven packages
+        uses: actions/cache@v4
+        with:
+          path: ~/.m2
+          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+          restore-keys: ${{ runner.os }}-m2-
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
+      - name: IT Test
+        shell: bash
+        # we do not compile client-cpp for saving time, it is tested in 
client.yml
+        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
+        run: |
+          retry() {
+            local -i max_attempts=3
+            local -i attempt=1
+            local -i retry_sleep=5
+            local test_output
+
+            while [ $attempt -le $max_attempts ]; do
+              mvn clean verify \
+              -P with-integration-tests \
+              -DskipUTs \
+              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
+              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }},${{ matrix.cluster3 }} \
+              -pl integration-test \
+              -am -PMultiClusterIT3 \
+              -ntp >> ~/run-tests-$attempt.log && return 0
+              test_output=$(cat ~/run-tests-$attempt.log) 
+
+              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
+              echo "$test_output"
+              echo "==================== END: ~/run-tests-$attempt.log 
======================"
+
+              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
+                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
+              fi
+
+              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
+                if [ $attempt -lt $max_attempts ]; then
+                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
+                  sleep $retry_sleep
+                  attempt=$((attempt + 1))
+                else
+                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
+                  echo "Treating this as a success because the issue is likely 
transient."
+                  return 0
+                fi
+              elif [ $? -ne 0 ]; then
+                echo "Test failed with a different error."
+                return 1
+              else
+                echo "Tests passed"
+                return 0
+              fi
+            done
+          }
+          retry
+      - name: Upload Artifact
+        if: failure()
+        uses: actions/upload-artifact@v4
+        with:
+          name: cluster-log-triple-java${{ matrix.java }}-${{ runner.os }}-${{ 
matrix.cluster1 }}-${{ matrix.cluster2 }}-${{ matrix.cluster3 }}
+          path: integration-test/target/cluster-logs
+          retention-days: 30
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
index 55c0170aacd..3b3fae80902 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
@@ -42,7 +42,6 @@ public abstract class AbstractPipeTableModelDualManualIT {
   }
 
   protected void setupConfig() {
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 0922b984457..958f9d40935 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -61,7 +61,6 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeTableModelDualManualIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
index d2c00c46213..3fce11bf51f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
@@ -67,7 +67,6 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
     schemaRegionReplicationFactor = Math.min(schemaRegionReplicationFactor, 
dataNodesNum);
     dataRegionReplicationFactor = Math.min(dataRegionReplicationFactor, 
dataNodesNum);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
index b9aee656b00..022d71d5487 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
@@ -61,7 +61,6 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeTableModelDualManualIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
index b24a6b6de9d..57b5846fc4b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
@@ -59,7 +59,6 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeTableModelDualManualIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
index 352d5b7fed8..de0f0c460ee 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
@@ -55,7 +55,6 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeTableModelDualManualIT
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
index eb2b86e079f..a7fae02f6d1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
@@ -50,7 +50,6 @@ public abstract class AbstractPipeDualTreeModelAutoIT {
   }
 
   protected void setupConfig() {
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
index 0447663e53b..0ffdac38ca7 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
@@ -54,7 +54,6 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualTreeModelAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
index cf9d108ee8d..0d6091c51ec 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
@@ -67,7 +67,6 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
     schemaRegionReplicationFactor = Math.min(schemaRegionReplicationFactor, 
dataNodesNum);
     dataRegionReplicationFactor = Math.min(dataRegionReplicationFactor, 
dataNodesNum);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 4556cdb459c..be4b6906533 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -63,7 +63,6 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
index a220673645c..d8a191c3719 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
@@ -55,7 +55,6 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualTreeModelAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
index 3de685b38f5..39bdd821647 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
@@ -55,7 +55,6 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualTreeModelAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     // All the schema operations must be under the same database to
     // be in the same region, therefore a non-idempotent operation can block 
the next one
     // and fail the IT
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
index 1a4be3b5978..74c90088fea 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
@@ -52,7 +52,6 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeDualTreeModelAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
index 20a7171253f..11f70d944b6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
@@ -50,7 +50,6 @@ public abstract class AbstractPipeDualTreeModelManualIT {
   }
 
   protected void setupConfig() {
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
index 2daa6cda9e6..cb0c334f68b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
@@ -53,7 +53,6 @@ public class IoTDBPipeMetaHistoricalIT extends 
AbstractPipeDualTreeModelManualIT
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index 085ed30e7f3..7fa5bdc0b37 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -53,7 +53,6 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
similarity index 57%
copy from 
integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
copy to 
integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
index 55c0170aacd..f4e63e1d2f8 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.it.dual.tablemodel.manual;
+package org.apache.iotdb.pipe.it.triple;
 
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.it.env.MultiEnvFactory;
@@ -26,54 +26,63 @@ import org.apache.iotdb.itbase.env.BaseEnv;
 import org.junit.After;
 import org.junit.Before;
 
-public abstract class AbstractPipeTableModelDualManualIT {
+abstract class AbstractPipeTripleManualIT {
 
-  protected BaseEnv senderEnv;
-  protected BaseEnv receiverEnv;
+  protected BaseEnv env1;
+  protected BaseEnv env2;
+  protected BaseEnv env3;
 
   @Before
   public void setUp() {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
+    MultiEnvFactory.createEnv(3);
+    env1 = MultiEnvFactory.getEnv(0);
+    env2 = MultiEnvFactory.getEnv(1);
+    env3 = MultiEnvFactory.getEnv(2);
     setupConfig();
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
+    env1.initClusterEnvironment(1, 1);
+    env2.initClusterEnvironment(1, 1);
+    env3.initClusterEnvironment(1, 1);
   }
 
   protected void setupConfig() {
-    // TODO: delete ratis configurations
-    senderEnv
-        .getConfig()
+    env1.getConfig()
         .getCommonConfig()
-        .setAutoCreateSchemaEnabled(true)
+        .setAutoCreateSchemaEnabled(false)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        .setEnforceStrongPassword(false)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
-    receiverEnv
-        .getConfig()
+    
env1.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+
+    env2.getConfig()
         .getCommonConfig()
-        .setAutoCreateSchemaEnabled(true)
+        .setAutoCreateSchemaEnabled(false)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        .setEnforceStrongPassword(false)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
 
-    // 10 min, assert that the operations will not time out
-    senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+    env3.getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(false)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
-    
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+    // 10 min, assert that the operations will not time out
+    env1.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+    env2.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+    env3.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
   }
 
   @After
   public final void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
+    env1.cleanClusterEnvironment();
+    env2.cleanClusterEnvironment();
+    env3.cleanClusterEnvironment();
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
new file mode 100644
index 00000000000..25019a705f6
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.triple;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT3;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT3.class})
+public class IoTDBPipeForwardIT extends AbstractPipeTripleManualIT {
+  @Test
+  public void testForwardingPipeRequests() throws Exception {
+    final DataNodeWrapper env2DataNode = env2.getDataNodeWrapper(0);
+    final String env2Ip = env2DataNode.getIp();
+    final int env2Port = env2DataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) env1.getLeaderConfigNodeConnection()) {
+      final Map<String, String> sourceAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
+
+      sourceAttributes.put("inclusion", "all");
+
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("mark-as-pipe-request", "false");
+      sinkAttributes.put("ip", env2Ip);
+      sinkAttributes.put("port", Integer.toString(env2Port));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+    }
+
+    final DataNodeWrapper env3DataNode = env3.getDataNodeWrapper(0);
+    final String env3Ip = env3DataNode.getIp();
+    final int env3Port = env3DataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) env2.getLeaderConfigNodeConnection()) {
+      final Map<String, String> sourceAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
+
+      sourceAttributes.put("inclusion", "all");
+      sourceAttributes.put("forwarding-pipe-requests", "false");
+
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("ip", env3Ip);
+      sinkAttributes.put("port", Integer.toString(env3Port));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+    }
+
+    TestUtils.executeNonQueries(
+        env1,
+        Arrays.asList(
+            "create database root.sg",
+            "create timeseries root.sg.wf01.GPS.status0 with 
datatype=BOOLEAN,encoding=PLAIN",
+            "insert into root.sg.wf01.GPS (time, status0) values (0, 1)"),
+        null);
+    TestUtils.assertDataEventuallyOnEnv(
+        env3,
+        "select status0 from root.sg.**",
+        "Time,root.sg.wf01.GPS.status0,",
+        Collections.singleton("0,true,"));
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 0e49b869e92..a8c90c6ced8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -595,51 +595,57 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
             
ConfigNodeDescriptor.getInstance().getConf().getDefaultDataRegionGroupNumPerDatabase());
         schema.setMaxSchemaRegionGroupNum(schema.getMinSchemaRegionGroupNum());
         schema.setMaxDataRegionGroupNum(schema.getMinDataRegionGroupNum());
-        return 
configManager.getClusterSchemaManager().setDatabase((DatabaseSchemaPlan) plan, 
true);
+        return configManager
+            .getClusterSchemaManager()
+            .setDatabase((DatabaseSchemaPlan) plan, 
shouldMarkAsPipeRequest.get());
       case AlterDatabase:
         return configManager
             .getClusterSchemaManager()
-            .alterDatabase((DatabaseSchemaPlan) plan, true);
+            .alterDatabase((DatabaseSchemaPlan) plan, 
shouldMarkAsPipeRequest.get());
       case DeleteDatabase:
         return configManager.deleteDatabases(
             new TDeleteDatabasesReq(
                     Collections.singletonList(((DeleteDatabasePlan) 
plan).getName()))
-                .setIsGeneratedByPipe(true)
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get())
                 .setIsTableModel(
                     PathUtils.isTableModelDatabase(((DeleteDatabasePlan) 
plan).getName())));
       case ExtendSchemaTemplate:
         return configManager
             .getClusterSchemaManager()
-            .extendSchemaTemplate(((ExtendSchemaTemplatePlan) 
plan).getTemplateExtendInfo(), true);
+            .extendSchemaTemplate(
+                ((ExtendSchemaTemplatePlan) plan).getTemplateExtendInfo(),
+                shouldMarkAsPipeRequest.get());
       case CommitSetSchemaTemplate:
         return configManager.setSchemaTemplate(
             new TSetSchemaTemplateReq(
                     queryId,
                     ((CommitSetSchemaTemplatePlan) plan).getName(),
                     ((CommitSetSchemaTemplatePlan) plan).getPath())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case PipeUnsetTemplate:
         return configManager.unsetSchemaTemplate(
             new TUnsetSchemaTemplateReq(
                     queryId,
                     ((PipeUnsetSchemaTemplatePlan) plan).getName(),
                     ((PipeUnsetSchemaTemplatePlan) plan).getPath())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case PipeDeleteTimeSeries:
         return configManager.deleteTimeSeries(
             new TDeleteTimeSeriesReq(
                     queryId, ((PipeDeleteTimeSeriesPlan) 
plan).getPatternTreeBytes())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case PipeDeleteLogicalView:
         return configManager.deleteLogicalView(
             new TDeleteLogicalViewReq(
                     queryId, ((PipeDeleteLogicalViewPlan) 
plan).getPatternTreeBytes())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case PipeDeactivateTemplate:
         return configManager
             .getProcedureManager()
             .deactivateTemplate(
-                queryId, ((PipeDeactivateTemplatePlan) 
plan).getTemplateSetInfo(), true);
+                queryId,
+                ((PipeDeactivateTemplatePlan) plan).getTemplateSetInfo(),
+                shouldMarkAsPipeRequest.get());
       case UpdateTriggerStateInTable:
         // TODO: Record complete message in trigger
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -649,8 +655,12 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                 .setIsGeneratedByPipe(true));
       case SetTTL:
         return ((SetTTLPlan) plan).getTTL() == TTLCache.NULL_TTL
-            ? configManager.getTTLManager().unsetTTL((SetTTLPlan) plan, true)
-            : configManager.getTTLManager().setTTL((SetTTLPlan) plan, true);
+            ? configManager
+                .getTTLManager()
+                .unsetTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get())
+            : configManager
+                .getTTLManager()
+                .setTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get());
       case PipeCreateTableOrView:
         return executeIdempotentCreateTableOrView((PipeCreateTableOrViewPlan) 
plan, queryId);
       case AddTableColumn:
@@ -667,7 +677,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((AddTableColumnPlan) plan).getTableName(),
                     queryId,
                     ((AddTableColumnPlan) plan).getColumnSchemaList(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case AddViewColumn:
         return configManager
             .getProcedureManager()
@@ -682,7 +692,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((AddTableViewColumnPlan) plan).getTableName(),
                     queryId,
                     ((AddTableViewColumnPlan) plan).getColumnSchemaList(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case SetTableProperties:
         return configManager
             .getProcedureManager()
@@ -697,7 +707,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((SetTablePropertiesPlan) plan).getTableName(),
                     queryId,
                     ((SetTablePropertiesPlan) plan).getProperties(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case SetViewProperties:
         return configManager
             .getProcedureManager()
@@ -712,7 +722,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((SetViewPropertiesPlan) plan).getTableName(),
                     queryId,
                     ((SetViewPropertiesPlan) plan).getProperties(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case CommitDeleteColumn:
         return configManager
             .getProcedureManager()
@@ -727,7 +737,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((CommitDeleteColumnPlan) plan).getTableName(),
                     queryId,
                     ((CommitDeleteColumnPlan) plan).getColumnName(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case CommitDeleteViewColumn:
         return configManager
             .getProcedureManager()
@@ -742,7 +752,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((CommitDeleteViewColumnPlan) plan).getTableName(),
                     queryId,
                     ((CommitDeleteViewColumnPlan) plan).getColumnName(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case RenameTableColumn:
         return configManager
             .getProcedureManager()
@@ -758,7 +768,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     queryId,
                     ((RenameTableColumnPlan) plan).getOldName(),
                     ((RenameTableColumnPlan) plan).getNewName(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case RenameViewColumn:
         return configManager
             .getProcedureManager()
@@ -774,7 +784,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     queryId,
                     ((RenameViewColumnPlan) plan).getOldName(),
                     ((RenameViewColumnPlan) plan).getNewName(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case CommitDeleteTable:
         return configManager
             .getProcedureManager()
@@ -788,7 +798,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((CommitDeleteTablePlan) plan).getDatabase(),
                     ((CommitDeleteTablePlan) plan).getTableName(),
                     queryId,
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case CommitDeleteView:
         return configManager
             .getProcedureManager()
@@ -802,7 +812,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((CommitDeleteViewPlan) plan).getDatabase(),
                     ((CommitDeleteViewPlan) plan).getTableName(),
                     queryId,
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case SetTableComment:
         return configManager
             .getClusterSchemaManager()
@@ -811,7 +821,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                 ((SetTableCommentPlan) plan).getTableName(),
                 ((SetTableCommentPlan) plan).getComment(),
                 false,
-                true);
+                shouldMarkAsPipeRequest.get());
       case SetViewComment:
         return configManager
             .getClusterSchemaManager()
@@ -820,7 +830,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                 ((SetViewCommentPlan) plan).getTableName(),
                 ((SetViewCommentPlan) plan).getComment(),
                 true,
-                true);
+                shouldMarkAsPipeRequest.get());
       case SetTableColumnComment:
         return configManager
             .getClusterSchemaManager()
@@ -829,7 +839,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                 ((SetTableColumnCommentPlan) plan).getTableName(),
                 ((SetTableColumnCommentPlan) plan).getColumnName(),
                 ((SetTableColumnCommentPlan) plan).getComment(),
-                true);
+                shouldMarkAsPipeRequest.get());
       case PipeDeleteDevices:
         return configManager
             .getProcedureManager()
@@ -841,7 +851,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ByteBuffer.wrap(((PipeDeleteDevicesPlan) 
plan).getPatternBytes()),
                     ByteBuffer.wrap(((PipeDeleteDevicesPlan) 
plan).getFilterBytes()),
                     ByteBuffer.wrap(((PipeDeleteDevicesPlan) 
plan).getModBytes())),
-                true)
+                shouldMarkAsPipeRequest.get())
             .getStatus();
       case RenameTable:
         return configManager
@@ -858,7 +868,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((RenameTablePlan) plan).getTableName(),
                     queryId,
                     ((RenameTablePlan) plan).getNewName(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case RenameView:
         return configManager
             .getProcedureManager()
@@ -873,7 +883,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ((RenameViewPlan) plan).getTableName(),
                     queryId,
                     ((RenameViewPlan) plan).getNewName(),
-                    true));
+                    shouldMarkAsPipeRequest.get()));
       case CreateUser:
       case CreateUserWithRawPassword:
       case CreateRole:
@@ -917,10 +927,14 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
       case RRevokeUserSysPri:
       case RGrantUserRole:
       case RRevokeUserRole:
-        return 
configManager.getPermissionManager().operatePermission((AuthorPlan) plan, true);
+        return configManager
+            .getPermissionManager()
+            .operatePermission((AuthorPlan) plan, 
shouldMarkAsPipeRequest.get());
       case CreateSchemaTemplate:
       default:
-        return configManager.getConsensusManager().write(new 
PipeEnrichedPlan(plan));
+        return configManager
+            .getConsensusManager()
+            .write(shouldMarkAsPipeRequest.get() ? new PipeEnrichedPlan(plan) 
: plan);
     }
   }
 


Reply via email to