This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 039a7f982ab [To dev/1.3] Pipe: Fixed the bug that mark-as-pipe-request
may not be of use in configNode events (#16581) (#16583)
039a7f982ab is described below
commit 039a7f982abc422a71fd58d985c02ae3ea25075b
Author: Caideyipi <[email protected]>
AuthorDate: Wed Oct 15 19:53:53 2025 +0800
[To dev/1.3] Pipe: Fixed the bug that mark-as-pipe-request may not be of
use in configNode events (#16581) (#16583)
---
.github/workflows/pipe-it-2cluster.yml | 82 +++++++++++++++
.../pipe/it/autocreate/AbstractPipeDualAutoIT.java | 1 -
.../it/autocreate/IoTDBPipeAutoConflictIT.java | 1 -
.../pipe/it/autocreate/IoTDBPipeIdempotentIT.java | 1 -
.../pipe/it/autocreate/IoTDBPipeProcessorIT.java | 1 -
.../pipe/it/autocreate/IoTDBPipeProtocolIT.java | 1 -
.../pipe/it/autocreate/IoTDBPipeSourceIT.java | 1 -
.../pipe/it/autocreate/IoTDBPipeWithLoadIT.java | 1 -
.../pipe/it/manual/AbstractPipeDualManualIT.java | 1 -
.../pipe/it/manual/IoTDBPipeMetaHistoricalIT.java | 1 -
.../pipe/it/manual/IoTDBPipePermissionIT.java | 1 -
.../AbstractPipeTripleManualIT.java} | 48 +++++----
.../iotdb/pipe/it/triple/IoTDBPipeForwardIT.java | 116 +++++++++++++++++++++
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 38 ++++---
14 files changed, 253 insertions(+), 41 deletions(-)
diff --git a/.github/workflows/pipe-it-2cluster.yml
b/.github/workflows/pipe-it-2cluster.yml
index 2f999b0fd9e..446c2eda05a 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -455,3 +455,85 @@ jobs:
name: cluster-log-subscription-regression-misc-java${{ matrix.java
}}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
path: integration-test/target/cluster-logs
retention-days: 30
+ triple:
+ strategy:
+ fail-fast: false
+ max-parallel: 1
+ matrix:
+ java: [ 17 ]
+ cluster1: [ ScalableSingleNodeMode ]
+ cluster2: [ ScalableSingleNodeMode ]
+ cluster3: [ ScalableSingleNodeMode ]
+ os: [ ubuntu-latest ]
+ runs-on: ${{ matrix.os }}
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up JDK ${{ matrix.java }}
+ uses: actions/setup-java@v4
+ with:
+ distribution: oracle
+ java-version: ${{ matrix.java }}
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ - name: Cache Maven packages
+ uses: actions/cache@v4
+ with:
+ path: ~/.m2
+ key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-m2-
+ - name: Sleep for a random duration between 0 and 10000 milliseconds
+ run: |
+ sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
+ - name: IT Test
+ shell: bash
+ # we do not compile client-cpp for saving time, it is tested in
client.yml
+ # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
+ run: |
+ retry() {
+ local -i max_attempts=3
+ local -i attempt=1
+ local -i retry_sleep=5
+ local test_output
+ while [ $attempt -le $max_attempts ]; do
+ mvn clean verify \
+ -P with-integration-tests \
+ -DskipUTs \
+ -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
+ -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }},${{ matrix.cluster3 }} \
+ -pl integration-test \
+ -am -PMultiClusterIT3 \
+ -ntp >> ~/run-tests-$attempt.log && return 0
+ test_output=$(cat ~/run-tests-$attempt.log)
+ echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
+ echo "$test_output"
+ echo "==================== END: ~/run-tests-$attempt.log
======================"
+ if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
+ echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
+ fi
+ if echo "$test_output" | grep -q "Could not transfer artifact";
then
+ if [ $attempt -lt $max_attempts ]; then
+ echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
+ sleep $retry_sleep
+ attempt=$((attempt + 1))
+ else
+ echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
+ echo "Treating this as a success because the issue is likely
transient."
+ return 0
+ fi
+ elif [ $? -ne 0 ]; then
+ echo "Test failed with a different error."
+ return 1
+ else
+ echo "Tests passed"
+ return 0
+ fi
+ done
+ }
+ retry
+ - name: Upload Artifact
+ if: failure()
+ uses: actions/upload-artifact@v4
+ with:
+ name: cluster-log-triple-java${{ matrix.java }}-${{ runner.os }}-${{
matrix.cluster1 }}-${{ matrix.cluster2 }}-${{ matrix.cluster3 }}
+ path: integration-test/target/cluster-logs
+ retention-days: 30
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
index 2c7f5719e9b..7914c7cb4ff 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
@@ -42,7 +42,6 @@ abstract class AbstractPipeDualAutoIT {
}
protected void setupConfig() {
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
index b89483840cb..30a0ff853a9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
@@ -52,7 +52,6 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
index 4e1270577d8..c5001f9e469 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java
@@ -53,7 +53,6 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
// All the schema operations must be under the same database to
// be in the same region, therefore a non-idempotent operation can block
the next one
// and fail the IT
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
index bbf4f206b59..dcd0df7d5dc 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
@@ -52,7 +52,6 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java
index ed68cc23eba..c245786db84 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java
@@ -65,7 +65,6 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualAutoIT {
schemaRegionReplicationFactor = Math.min(schemaRegionReplicationFactor,
dataNodesNum);
dataRegionReplicationFactor = Math.min(dataRegionReplicationFactor,
dataNodesNum);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
index 1ad6a4f37dc..9ee5b6ae15d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
@@ -61,7 +61,6 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualAutoIT
{
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
index d87aa3b5fae..6793ecc8d98 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java
@@ -51,7 +51,6 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeDualAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
index 299d1b55c5b..c31ee76c6ad 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
@@ -42,7 +42,6 @@ abstract class AbstractPipeDualManualIT {
}
protected void setupConfig() {
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
index 94f629d846c..513d112d763 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java
@@ -53,7 +53,6 @@ public class IoTDBPipeMetaHistoricalIT extends
AbstractPipeDualManualIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
index e43332d1270..c96f640ff2d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
@@ -55,7 +55,6 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualManualIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
similarity index 61%
copy from
integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
copy to
integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
index 299d1b55c5b..c0cbec7f29c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.pipe.it.manual;
+package org.apache.iotdb.pipe.it.triple;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.MultiEnvFactory;
@@ -26,25 +26,26 @@ import org.apache.iotdb.itbase.env.BaseEnv;
import org.junit.After;
import org.junit.Before;
-abstract class AbstractPipeDualManualIT {
+abstract class AbstractPipeTripleManualIT {
- protected BaseEnv senderEnv;
- protected BaseEnv receiverEnv;
+ protected BaseEnv env1;
+ protected BaseEnv env2;
+ protected BaseEnv env3;
@Before
public void setUp() {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
+ MultiEnvFactory.createEnv(3);
+ env1 = MultiEnvFactory.getEnv(0);
+ env2 = MultiEnvFactory.getEnv(1);
+ env3 = MultiEnvFactory.getEnv(2);
setupConfig();
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
+ env1.initClusterEnvironment(1, 1);
+ env2.initClusterEnvironment(1, 1);
+ env3.initClusterEnvironment(1, 1);
}
protected void setupConfig() {
- // TODO: delete ratis configurations
- senderEnv
- .getConfig()
+ env1.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -52,9 +53,18 @@ abstract class AbstractPipeDualManualIT {
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
+ env1.getConfig().getDataNodeConfig();
- receiverEnv
- .getConfig()
+ env2.getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(false)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
+
+ env3.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -64,13 +74,15 @@ abstract class AbstractPipeDualManualIT {
.setPipeAutoSplitFullEnabled(false);
// 10 min, assert that the operations will not time out
- senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ env1.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ env2.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ env3.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
}
@After
public final void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
+ env1.cleanClusterEnvironment();
+ env2.cleanClusterEnvironment();
+ env3.cleanClusterEnvironment();
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
new file mode 100644
index 00000000000..25bd0cf1366
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.triple;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT3;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT3.class})
+public class IoTDBPipeForwardIT extends AbstractPipeTripleManualIT {
+ @Test
+ public void testForwardingPipeRequests() throws Exception {
+ final DataNodeWrapper env2DataNode = env2.getDataNodeWrapper(0);
+ final String env2Ip = env2DataNode.getIp();
+ final int env2Port = env2DataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) env1.getLeaderConfigNodeConnection()) {
+ final Map<String, String> sourceAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
+
+ sourceAttributes.put("inclusion", "all");
+
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("mark-as-pipe-request", "false");
+ sinkAttributes.put("ip", env2Ip);
+ sinkAttributes.put("port", Integer.toString(env2Port));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+ }
+
+ final DataNodeWrapper env3DataNode = env3.getDataNodeWrapper(0);
+ final String env3Ip = env3DataNode.getIp();
+ final int env3Port = env3DataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) env2.getLeaderConfigNodeConnection()) {
+ final Map<String, String> sourceAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
+
+ sourceAttributes.put("inclusion", "all");
+ sourceAttributes.put("forwarding-pipe-requests", "false");
+
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("ip", env3Ip);
+ sinkAttributes.put("port", Integer.toString(env3Port));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+ }
+
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ env1,
+ Arrays.asList(
+ "create database root.sg",
+ "create timeseries root.sg.wf01.GPS.status0 with
datatype=BOOLEAN,encoding=PLAIN",
+ "insert into root.sg.wf01.GPS (time, status0) values (0, 1)"));
+ TestUtils.assertDataEventuallyOnEnv(
+ env3,
+ "select status0 from root.sg.**",
+ "Time,root.sg.wf01.GPS.status0,",
+ Collections.singleton("0,true,"));
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index d06762d7f88..439e16dc554 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -377,53 +377,57 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
ConfigNodeDescriptor.getInstance().getConf().getDefaultDataRegionGroupNumPerDatabase());
schema.setMaxSchemaRegionGroupNum(schema.getMinSchemaRegionGroupNum());
schema.setMaxDataRegionGroupNum(schema.getMinDataRegionGroupNum());
- return
configManager.getClusterSchemaManager().setDatabase((DatabaseSchemaPlan) plan,
true);
+ return configManager
+ .getClusterSchemaManager()
+ .setDatabase((DatabaseSchemaPlan) plan,
shouldMarkAsPipeRequest.get());
case AlterDatabase:
return configManager
.getClusterSchemaManager()
- .alterDatabase((DatabaseSchemaPlan) plan, true);
+ .alterDatabase((DatabaseSchemaPlan) plan,
shouldMarkAsPipeRequest.get());
case DeleteDatabase:
return configManager.deleteDatabases(
new TDeleteDatabasesReq(
Collections.singletonList(((DeleteDatabasePlan)
plan).getName()))
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case ExtendSchemaTemplate:
return configManager
.getClusterSchemaManager()
- .extendSchemaTemplate(((ExtendSchemaTemplatePlan)
plan).getTemplateExtendInfo(), true);
+ .extendSchemaTemplate(
+ ((ExtendSchemaTemplatePlan) plan).getTemplateExtendInfo(),
+ shouldMarkAsPipeRequest.get());
case CommitSetSchemaTemplate:
return configManager.setSchemaTemplate(
new TSetSchemaTemplateReq(
generatePseudoQueryId(),
((CommitSetSchemaTemplatePlan) plan).getName(),
((CommitSetSchemaTemplatePlan) plan).getPath())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case PipeUnsetTemplate:
return configManager.unsetSchemaTemplate(
new TUnsetSchemaTemplateReq(
generatePseudoQueryId(),
((PipeUnsetSchemaTemplatePlan) plan).getName(),
((PipeUnsetSchemaTemplatePlan) plan).getPath())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case PipeDeleteTimeSeries:
return configManager.deleteTimeSeries(
new TDeleteTimeSeriesReq(
generatePseudoQueryId(),
((PipeDeleteTimeSeriesPlan) plan).getPatternTreeBytes())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case PipeDeleteLogicalView:
return configManager.deleteLogicalView(
new TDeleteLogicalViewReq(
generatePseudoQueryId(),
((PipeDeleteLogicalViewPlan) plan).getPatternTreeBytes())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case PipeDeactivateTemplate:
return configManager
.getProcedureManager()
.deactivateTemplate(
generatePseudoQueryId(),
((PipeDeactivateTemplatePlan) plan).getTemplateSetInfo(),
- true);
+ shouldMarkAsPipeRequest.get());
case UpdateTriggerStateInTable:
// TODO: Record complete message in trigger
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -433,8 +437,12 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
.setIsGeneratedByPipe(true));
case SetTTL:
return ((SetTTLPlan) plan).getTTL() == TTLCache.NULL_TTL
- ? configManager.getTTLManager().unsetTTL((SetTTLPlan) plan, true)
- : configManager.getTTLManager().setTTL((SetTTLPlan) plan, true);
+ ? configManager
+ .getTTLManager()
+ .unsetTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get())
+ : configManager
+ .getTTLManager()
+ .setTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get());
case DropUser:
case DropRole:
case GrantRole:
@@ -444,13 +452,17 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
case RevokeRole:
case RevokeRoleFromUser:
case UpdateUser:
- return
configManager.getPermissionManager().operatePermission((AuthorPlan) plan, true);
+ return configManager
+ .getPermissionManager()
+ .operatePermission((AuthorPlan) plan,
shouldMarkAsPipeRequest.get());
case CreateSchemaTemplate:
case CreateUser:
case CreateRole:
case CreateUserWithRawPassword:
default:
- return configManager.getConsensusManager().write(new
PipeEnrichedPlan(plan));
+ return configManager
+ .getConsensusManager()
+ .write(shouldMarkAsPipeRequest.get() ? new PipeEnrichedPlan(plan)
: plan);
}
}