This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 039a7f982ab [To dev/1.3] Pipe: Fixed the bug that mark-as-pipe-request 
may not be of use in configNode events (#16581) (#16583)
039a7f982ab is described below

commit 039a7f982abc422a71fd58d985c02ae3ea25075b
Author: Caideyipi <[email protected]>
AuthorDate: Wed Oct 15 19:53:53 2025 +0800

    [To dev/1.3] Pipe: Fixed the bug that mark-as-pipe-request may not be of 
use in configNode events (#16581) (#16583)
---
 .github/workflows/pipe-it-2cluster.yml             |  82 +++++++++++++++
 .../pipe/it/autocreate/AbstractPipeDualAutoIT.java |   1 -
 .../it/autocreate/IoTDBPipeAutoConflictIT.java     |   1 -
 .../pipe/it/autocreate/IoTDBPipeIdempotentIT.java  |   1 -
 .../pipe/it/autocreate/IoTDBPipeProcessorIT.java   |   1 -
 .../pipe/it/autocreate/IoTDBPipeProtocolIT.java    |   1 -
 .../pipe/it/autocreate/IoTDBPipeSourceIT.java      |   1 -
 .../pipe/it/autocreate/IoTDBPipeWithLoadIT.java    |   1 -
 .../pipe/it/manual/AbstractPipeDualManualIT.java   |   1 -
 .../pipe/it/manual/IoTDBPipeMetaHistoricalIT.java  |   1 -
 .../pipe/it/manual/IoTDBPipePermissionIT.java      |   1 -
 .../AbstractPipeTripleManualIT.java}               |  48 +++++----
 .../iotdb/pipe/it/triple/IoTDBPipeForwardIT.java   | 116 +++++++++++++++++++++
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  38 ++++---
 14 files changed, 253 insertions(+), 41 deletions(-)

diff --git a/.github/workflows/pipe-it-2cluster.yml 
b/.github/workflows/pipe-it-2cluster.yml
index 2f999b0fd9e..446c2eda05a 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -455,3 +455,85 @@ jobs:
           name: cluster-log-subscription-regression-misc-java${{ matrix.java 
}}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
           path: integration-test/target/cluster-logs
           retention-days: 30
+  triple:
+    strategy:
+      fail-fast: false
+      max-parallel: 1
+      matrix:
+        java: [ 17 ]
+        cluster1: [ ScalableSingleNodeMode ]
+        cluster2: [ ScalableSingleNodeMode ]
+        cluster3: [ ScalableSingleNodeMode ]
+        os: [ ubuntu-latest ]
+    runs-on: ${{ matrix.os }}
+    steps:
+      - uses: actions/checkout@v4
+      - name: Set up JDK ${{ matrix.java }}
+        uses: actions/setup-java@v4
+        with:
+          distribution: oracle
+          java-version: ${{ matrix.java }}
+        env:
+          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+      - name: Cache Maven packages
+        uses: actions/cache@v4
+        with:
+          path: ~/.m2
+          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+          restore-keys: ${{ runner.os }}-m2-
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
+      - name: IT Test
+        shell: bash
+        # we do not compile client-cpp for saving time, it is tested in 
client.yml
+        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
+        run: |
+          retry() {
+            local -i max_attempts=3
+            local -i attempt=1
+            local -i retry_sleep=5
+            local test_output
+            while [ $attempt -le $max_attempts ]; do
+              mvn clean verify \
+              -P with-integration-tests \
+              -DskipUTs \
+              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
+              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }},${{ matrix.cluster3 }} \
+              -pl integration-test \
+              -am -PMultiClusterIT3 \
+              -ntp >> ~/run-tests-$attempt.log && return 0
+              test_output=$(cat ~/run-tests-$attempt.log) 
+              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
+              echo "$test_output"
+              echo "==================== END: ~/run-tests-$attempt.log 
======================"
+              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
+                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
+              fi
+              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
+                if [ $attempt -lt $max_attempts ]; then
+                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
+                  sleep $retry_sleep
+                  attempt=$((attempt + 1))
+                else
+                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
+                  echo "Treating this as a success because the issue is likely 
transient."
+                  return 0
+                fi
+              elif [ $? -ne 0 ]; then
+                echo "Test failed with a different error."
+                return 1
+              else
+                echo "Tests passed"
+                return 0
+              fi
+            done
+          }
+          retry
+      - name: Upload Artifact
+        if: failure()
+        uses: actions/upload-artifact@v4
+        with:
+          name: cluster-log-triple-java${{ matrix.java }}-${{ runner.os }}-${{ 
matrix.cluster1 }}-${{ matrix.cluster2 }}-${{ matrix.cluster3 }}
+          path: integration-test/target/cluster-logs
+          retention-days: 30
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
index 2c7f5719e9b..7914c7cb4ff 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
@@ -42,7 +42,6 @@ abstract class AbstractPipeDualAutoIT {
   }
 
   protected void setupConfig() {
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
index b89483840cb..30a0ff853a9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
@@ -52,7 +52,6 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
index 4e1270577d8..c5001f9e469 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
@@ -53,7 +53,6 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     // All the schema operations must be under the same database to
     // be in the same region, therefore a non-idempotent operation can block 
the next one
     // and fail the IT
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
index bbf4f206b59..dcd0df7d5dc 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
@@ -52,7 +52,6 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java
index ed68cc23eba..c245786db84 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java
@@ -65,7 +65,6 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualAutoIT {
     schemaRegionReplicationFactor = Math.min(schemaRegionReplicationFactor, 
dataNodesNum);
     dataRegionReplicationFactor = Math.min(dataRegionReplicationFactor, 
dataNodesNum);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
index 1ad6a4f37dc..9ee5b6ae15d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
@@ -61,7 +61,6 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualAutoIT 
{
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
index d87aa3b5fae..6793ecc8d98 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
@@ -51,7 +51,6 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeDualAutoIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
index 299d1b55c5b..c31ee76c6ad 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
@@ -42,7 +42,6 @@ abstract class AbstractPipeDualManualIT {
   }
 
   protected void setupConfig() {
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
index 94f629d846c..513d112d763 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
@@ -53,7 +53,6 @@ public class IoTDBPipeMetaHistoricalIT extends 
AbstractPipeDualManualIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
index e43332d1270..c96f640ff2d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
@@ -55,7 +55,6 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualManualIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
-    // TODO: delete ratis configurations
     senderEnv
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
similarity index 61%
copy from 
integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
copy to 
integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
index 299d1b55c5b..c0cbec7f29c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.it.manual;
+package org.apache.iotdb.pipe.it.triple;
 
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.it.env.MultiEnvFactory;
@@ -26,25 +26,26 @@ import org.apache.iotdb.itbase.env.BaseEnv;
 import org.junit.After;
 import org.junit.Before;
 
-abstract class AbstractPipeDualManualIT {
+abstract class AbstractPipeTripleManualIT {
 
-  protected BaseEnv senderEnv;
-  protected BaseEnv receiverEnv;
+  protected BaseEnv env1;
+  protected BaseEnv env2;
+  protected BaseEnv env3;
 
   @Before
   public void setUp() {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
+    MultiEnvFactory.createEnv(3);
+    env1 = MultiEnvFactory.getEnv(0);
+    env2 = MultiEnvFactory.getEnv(1);
+    env3 = MultiEnvFactory.getEnv(2);
     setupConfig();
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
+    env1.initClusterEnvironment(1, 1);
+    env2.initClusterEnvironment(1, 1);
+    env3.initClusterEnvironment(1, 1);
   }
 
   protected void setupConfig() {
-    // TODO: delete ratis configurations
-    senderEnv
-        .getConfig()
+    env1.getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -52,9 +53,18 @@ abstract class AbstractPipeDualManualIT {
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setPipeAutoSplitFullEnabled(false);
+    env1.getConfig().getDataNodeConfig();
 
-    receiverEnv
-        .getConfig()
+    env2.getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(false)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
+
+    env3.getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -64,13 +74,15 @@ abstract class AbstractPipeDualManualIT {
         .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
-    senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+    env1.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+    env2.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+    env3.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
   }
 
   @After
   public final void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
+    env1.cleanClusterEnvironment();
+    env2.cleanClusterEnvironment();
+    env3.cleanClusterEnvironment();
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
new file mode 100644
index 00000000000..25bd0cf1366
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.triple;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT3;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT3.class})
+public class IoTDBPipeForwardIT extends AbstractPipeTripleManualIT {
+  @Test
+  public void testForwardingPipeRequests() throws Exception {
+    final DataNodeWrapper env2DataNode = env2.getDataNodeWrapper(0);
+    final String env2Ip = env2DataNode.getIp();
+    final int env2Port = env2DataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) env1.getLeaderConfigNodeConnection()) {
+      final Map<String, String> sourceAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
+
+      sourceAttributes.put("inclusion", "all");
+
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("mark-as-pipe-request", "false");
+      sinkAttributes.put("ip", env2Ip);
+      sinkAttributes.put("port", Integer.toString(env2Port));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+    }
+
+    final DataNodeWrapper env3DataNode = env3.getDataNodeWrapper(0);
+    final String env3Ip = env3DataNode.getIp();
+    final int env3Port = env3DataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) env2.getLeaderConfigNodeConnection()) {
+      final Map<String, String> sourceAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
+
+      sourceAttributes.put("inclusion", "all");
+      sourceAttributes.put("forwarding-pipe-requests", "false");
+
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("ip", env3Ip);
+      sinkAttributes.put("port", Integer.toString(env3Port));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+    }
+
+    TestUtils.tryExecuteNonQueriesWithRetry(
+        env1,
+        Arrays.asList(
+            "create database root.sg",
+            "create timeseries root.sg.wf01.GPS.status0 with 
datatype=BOOLEAN,encoding=PLAIN",
+            "insert into root.sg.wf01.GPS (time, status0) values (0, 1)"));
+    TestUtils.assertDataEventuallyOnEnv(
+        env3,
+        "select status0 from root.sg.**",
+        "Time,root.sg.wf01.GPS.status0,",
+        Collections.singleton("0,true,"));
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index d06762d7f88..439e16dc554 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -377,53 +377,57 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
             
ConfigNodeDescriptor.getInstance().getConf().getDefaultDataRegionGroupNumPerDatabase());
         schema.setMaxSchemaRegionGroupNum(schema.getMinSchemaRegionGroupNum());
         schema.setMaxDataRegionGroupNum(schema.getMinDataRegionGroupNum());
-        return 
configManager.getClusterSchemaManager().setDatabase((DatabaseSchemaPlan) plan, 
true);
+        return configManager
+            .getClusterSchemaManager()
+            .setDatabase((DatabaseSchemaPlan) plan, 
shouldMarkAsPipeRequest.get());
       case AlterDatabase:
         return configManager
             .getClusterSchemaManager()
-            .alterDatabase((DatabaseSchemaPlan) plan, true);
+            .alterDatabase((DatabaseSchemaPlan) plan, 
shouldMarkAsPipeRequest.get());
       case DeleteDatabase:
         return configManager.deleteDatabases(
             new TDeleteDatabasesReq(
                     Collections.singletonList(((DeleteDatabasePlan) 
plan).getName()))
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case ExtendSchemaTemplate:
         return configManager
             .getClusterSchemaManager()
-            .extendSchemaTemplate(((ExtendSchemaTemplatePlan) 
plan).getTemplateExtendInfo(), true);
+            .extendSchemaTemplate(
+                ((ExtendSchemaTemplatePlan) plan).getTemplateExtendInfo(),
+                shouldMarkAsPipeRequest.get());
       case CommitSetSchemaTemplate:
         return configManager.setSchemaTemplate(
             new TSetSchemaTemplateReq(
                     generatePseudoQueryId(),
                     ((CommitSetSchemaTemplatePlan) plan).getName(),
                     ((CommitSetSchemaTemplatePlan) plan).getPath())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case PipeUnsetTemplate:
         return configManager.unsetSchemaTemplate(
             new TUnsetSchemaTemplateReq(
                     generatePseudoQueryId(),
                     ((PipeUnsetSchemaTemplatePlan) plan).getName(),
                     ((PipeUnsetSchemaTemplatePlan) plan).getPath())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case PipeDeleteTimeSeries:
         return configManager.deleteTimeSeries(
             new TDeleteTimeSeriesReq(
                     generatePseudoQueryId(),
                     ((PipeDeleteTimeSeriesPlan) plan).getPatternTreeBytes())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case PipeDeleteLogicalView:
         return configManager.deleteLogicalView(
             new TDeleteLogicalViewReq(
                     generatePseudoQueryId(),
                     ((PipeDeleteLogicalViewPlan) plan).getPatternTreeBytes())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case PipeDeactivateTemplate:
         return configManager
             .getProcedureManager()
             .deactivateTemplate(
                 generatePseudoQueryId(),
                 ((PipeDeactivateTemplatePlan) plan).getTemplateSetInfo(),
-                true);
+                shouldMarkAsPipeRequest.get());
       case UpdateTriggerStateInTable:
         // TODO: Record complete message in trigger
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -433,8 +437,12 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                 .setIsGeneratedByPipe(true));
       case SetTTL:
         return ((SetTTLPlan) plan).getTTL() == TTLCache.NULL_TTL
-            ? configManager.getTTLManager().unsetTTL((SetTTLPlan) plan, true)
-            : configManager.getTTLManager().setTTL((SetTTLPlan) plan, true);
+            ? configManager
+                .getTTLManager()
+                .unsetTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get())
+            : configManager
+                .getTTLManager()
+                .setTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get());
       case DropUser:
       case DropRole:
       case GrantRole:
@@ -444,13 +452,17 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
       case RevokeRole:
       case RevokeRoleFromUser:
       case UpdateUser:
-        return 
configManager.getPermissionManager().operatePermission((AuthorPlan) plan, true);
+        return configManager
+            .getPermissionManager()
+            .operatePermission((AuthorPlan) plan, 
shouldMarkAsPipeRequest.get());
       case CreateSchemaTemplate:
       case CreateUser:
       case CreateRole:
       case CreateUserWithRawPassword:
       default:
-        return configManager.getConsensusManager().write(new 
PipeEnrichedPlan(plan));
+        return configManager
+            .getConsensusManager()
+            .write(shouldMarkAsPipeRequest.get() ? new PipeEnrichedPlan(plan) 
: plan);
     }
   }
 

Reply via email to