This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4c037e7df03 Pipe: Fixed the bug that mark-as-pipe-request may not be
of use in configNode events (#16581)
4c037e7df03 is described below
commit 4c037e7df038e3b5a9a821c5571e0dbab06ff250
Author: Caideyipi <[email protected]>
AuthorDate: Wed Oct 15 11:00:58 2025 +0800
Pipe: Fixed the bug that mark-as-pipe-request may not be of use in
configNode events (#16581)
---
.github/workflows/pipe-it.yml | 86 +++++++++++++++
.../manual/AbstractPipeTableModelDualManualIT.java | 1 -
.../manual/basic/IoTDBPipePermissionIT.java | 1 -
.../manual/basic/IoTDBPipeProtocolIT.java | 1 -
.../tablemodel/manual/basic/IoTDBPipeSourceIT.java | 1 -
.../manual/basic/IoTDBPipeWithLoadIT.java | 1 -
.../manual/enhanced/IoTDBPipeAutoConflictIT.java | 1 -
.../auto/AbstractPipeDualTreeModelAutoIT.java | 1 -
.../treemodel/auto/basic/IoTDBPipeProcessorIT.java | 1 -
.../treemodel/auto/basic/IoTDBPipeProtocolIT.java | 1 -
.../treemodel/auto/basic/IoTDBPipeSourceIT.java | 1 -
.../auto/enhanced/IoTDBPipeAutoConflictIT.java | 1 -
.../auto/enhanced/IoTDBPipeIdempotentIT.java | 1 -
.../auto/enhanced/IoTDBPipeWithLoadIT.java | 1 -
.../manual/AbstractPipeDualTreeModelManualIT.java | 1 -
.../manual/IoTDBPipeMetaHistoricalIT.java | 1 -
.../treemodel/manual/IoTDBPipePermissionIT.java | 1 -
.../AbstractPipeTripleManualIT.java} | 57 +++++-----
.../iotdb/pipe/it/triple/IoTDBPipeForwardIT.java | 117 +++++++++++++++++++++
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 72 ++++++++-----
20 files changed, 279 insertions(+), 69 deletions(-)
diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml
index 154cebf968c..9c2899f1fe8 100644
--- a/.github/workflows/pipe-it.yml
+++ b/.github/workflows/pipe-it.yml
@@ -890,3 +890,89 @@ jobs:
name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java
}}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
path: integration-test/target/cluster-logs
retention-days: 30
+ triple:
+ strategy:
+ fail-fast: false
+ max-parallel: 1
+ matrix:
+ java: [ 17 ]
+ cluster1: [ ScalableSingleNodeMode ]
+ cluster2: [ ScalableSingleNodeMode ]
+ cluster3: [ ScalableSingleNodeMode ]
+ os: [ ubuntu-latest ]
+ runs-on: ${{ matrix.os }}
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up JDK ${{ matrix.java }}
+ uses: actions/setup-java@v4
+ with:
+ distribution: oracle
+ java-version: ${{ matrix.java }}
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ - name: Cache Maven packages
+ uses: actions/cache@v4
+ with:
+ path: ~/.m2
+ key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-m2-
+ - name: Sleep for a random duration between 0 and 10000 milliseconds
+ run: |
+ sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
+ - name: IT Test
+ shell: bash
+ # we do not compile client-cpp for saving time, it is tested in
client.yml
+ # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
+ run: |
+ retry() {
+ local -i max_attempts=3
+ local -i attempt=1
+ local -i retry_sleep=5
+ local test_output
+
+ while [ $attempt -le $max_attempts ]; do
+ mvn clean verify \
+ -P with-integration-tests \
+ -DskipUTs \
+ -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
+ -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }},${{ matrix.cluster3 }} \
+ -pl integration-test \
+ -am -PMultiClusterIT3 \
+ -ntp >> ~/run-tests-$attempt.log && return 0
+ test_output=$(cat ~/run-tests-$attempt.log)
+
+ echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
+ echo "$test_output"
+ echo "==================== END: ~/run-tests-$attempt.log
======================"
+
+ if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
+ echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
+ fi
+
+ if echo "$test_output" | grep -q "Could not transfer artifact";
then
+ if [ $attempt -lt $max_attempts ]; then
+ echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
+ sleep $retry_sleep
+ attempt=$((attempt + 1))
+ else
+ echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
+ echo "Treating this as a success because the issue is likely
transient."
+ return 0
+ fi
+ elif [ $? -ne 0 ]; then
+ echo "Test failed with a different error."
+ return 1
+ else
+ echo "Tests passed"
+ return 0
+ fi
+ done
+ }
+ retry
+ - name: Upload Artifact
+ if: failure()
+ uses: actions/upload-artifact@v4
+ with:
+ name: cluster-log-triple-java${{ matrix.java }}-${{ runner.os }}-${{
matrix.cluster1 }}-${{ matrix.cluster2 }}-${{ matrix.cluster3 }}
+ path: integration-test/target/cluster-logs
+ retention-days: 30
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
index 55c0170aacd..3b3fae80902 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
@@ -42,7 +42,6 @@ public abstract class AbstractPipeTableModelDualManualIT {
}
protected void setupConfig() {
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 0922b984457..958f9d40935 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -61,7 +61,6 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
index d2c00c46213..3fce11bf51f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
@@ -67,7 +67,6 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
schemaRegionReplicationFactor = Math.min(schemaRegionReplicationFactor,
dataNodesNum);
dataRegionReplicationFactor = Math.min(dataRegionReplicationFactor,
dataNodesNum);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
index b9aee656b00..022d71d5487 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
@@ -61,7 +61,6 @@ public class IoTDBPipeSourceIT extends
AbstractPipeTableModelDualManualIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
index b24a6b6de9d..57b5846fc4b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
@@ -59,7 +59,6 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeTableModelDualManualIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
index 352d5b7fed8..de0f0c460ee 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
@@ -55,7 +55,6 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeTableModelDualManualIT
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
index eb2b86e079f..a7fae02f6d1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
@@ -50,7 +50,6 @@ public abstract class AbstractPipeDualTreeModelAutoIT {
}
protected void setupConfig() {
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
index 0447663e53b..0ffdac38ca7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
@@ -54,7 +54,6 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualTreeModelAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
index cf9d108ee8d..0d6091c51ec 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
@@ -67,7 +67,6 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
schemaRegionReplicationFactor = Math.min(schemaRegionReplicationFactor,
dataNodesNum);
dataRegionReplicationFactor = Math.min(dataRegionReplicationFactor,
dataNodesNum);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 4556cdb459c..be4b6906533 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -63,7 +63,6 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
index a220673645c..d8a191c3719 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
@@ -55,7 +55,6 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualTreeModelAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
index 3de685b38f5..39bdd821647 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
@@ -55,7 +55,6 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualTreeModelAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
// All the schema operations must be under the same database to
// be in the same region, therefore a non-idempotent operation can block
the next one
// and fail the IT
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
index 1a4be3b5978..74c90088fea 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
@@ -52,7 +52,6 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeDualTreeModelAutoIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
index 20a7171253f..11f70d944b6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
@@ -50,7 +50,6 @@ public abstract class AbstractPipeDualTreeModelManualIT {
}
protected void setupConfig() {
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
index 2daa6cda9e6..cb0c334f68b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
@@ -53,7 +53,6 @@ public class IoTDBPipeMetaHistoricalIT extends
AbstractPipeDualTreeModelManualIT
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index 085ed30e7f3..7fa5bdc0b37 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -53,7 +53,6 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
- // TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
similarity index 57%
copy from
integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
copy to
integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
index 55c0170aacd..f4e63e1d2f8 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.pipe.it.dual.tablemodel.manual;
+package org.apache.iotdb.pipe.it.triple;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.MultiEnvFactory;
@@ -26,54 +26,63 @@ import org.apache.iotdb.itbase.env.BaseEnv;
import org.junit.After;
import org.junit.Before;
-public abstract class AbstractPipeTableModelDualManualIT {
+abstract class AbstractPipeTripleManualIT {
- protected BaseEnv senderEnv;
- protected BaseEnv receiverEnv;
+ protected BaseEnv env1;
+ protected BaseEnv env2;
+ protected BaseEnv env3;
@Before
public void setUp() {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
+ MultiEnvFactory.createEnv(3);
+ env1 = MultiEnvFactory.getEnv(0);
+ env2 = MultiEnvFactory.getEnv(1);
+ env3 = MultiEnvFactory.getEnv(2);
setupConfig();
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
+ env1.initClusterEnvironment(1, 1);
+ env2.initClusterEnvironment(1, 1);
+ env3.initClusterEnvironment(1, 1);
}
protected void setupConfig() {
- // TODO: delete ratis configurations
- senderEnv
- .getConfig()
+ env1.getConfig()
.getCommonConfig()
- .setAutoCreateSchemaEnabled(true)
+ .setAutoCreateSchemaEnabled(false)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
- .setEnforceStrongPassword(false)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
- receiverEnv
- .getConfig()
+
env1.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+
+ env2.getConfig()
.getCommonConfig()
- .setAutoCreateSchemaEnabled(true)
+ .setAutoCreateSchemaEnabled(false)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
- .setEnforceStrongPassword(false)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
- // 10 min, assert that the operations will not time out
- senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ env3.getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(false)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false)
+ .setPipeAutoSplitFullEnabled(false);
-
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+ // 10 min, assert that the operations will not time out
+ env1.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ env2.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ env3.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
}
@After
public final void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
+ env1.cleanClusterEnvironment();
+ env2.cleanClusterEnvironment();
+ env3.cleanClusterEnvironment();
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
new file mode 100644
index 00000000000..25019a705f6
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/IoTDBPipeForwardIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.triple;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT3;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT3.class})
+public class IoTDBPipeForwardIT extends AbstractPipeTripleManualIT {
+ @Test
+ public void testForwardingPipeRequests() throws Exception {
+ final DataNodeWrapper env2DataNode = env2.getDataNodeWrapper(0);
+ final String env2Ip = env2DataNode.getIp();
+ final int env2Port = env2DataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) env1.getLeaderConfigNodeConnection()) {
+ final Map<String, String> sourceAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
+
+ sourceAttributes.put("inclusion", "all");
+
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("mark-as-pipe-request", "false");
+ sinkAttributes.put("ip", env2Ip);
+ sinkAttributes.put("port", Integer.toString(env2Port));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+ }
+
+ final DataNodeWrapper env3DataNode = env3.getDataNodeWrapper(0);
+ final String env3Ip = env3DataNode.getIp();
+ final int env3Port = env3DataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) env2.getLeaderConfigNodeConnection()) {
+ final Map<String, String> sourceAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> sinkAttributes = new HashMap<>();
+
+ sourceAttributes.put("inclusion", "all");
+ sourceAttributes.put("forwarding-pipe-requests", "false");
+
+ sinkAttributes.put("sink", "iotdb-thrift-sink");
+ sinkAttributes.put("ip", env3Ip);
+ sinkAttributes.put("port", Integer.toString(env3Port));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+ }
+
+ TestUtils.executeNonQueries(
+ env1,
+ Arrays.asList(
+ "create database root.sg",
+ "create timeseries root.sg.wf01.GPS.status0 with
datatype=BOOLEAN,encoding=PLAIN",
+ "insert into root.sg.wf01.GPS (time, status0) values (0, 1)"),
+ null);
+ TestUtils.assertDataEventuallyOnEnv(
+ env3,
+ "select status0 from root.sg.**",
+ "Time,root.sg.wf01.GPS.status0,",
+ Collections.singleton("0,true,"));
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 0e49b869e92..a8c90c6ced8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -595,51 +595,57 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
ConfigNodeDescriptor.getInstance().getConf().getDefaultDataRegionGroupNumPerDatabase());
schema.setMaxSchemaRegionGroupNum(schema.getMinSchemaRegionGroupNum());
schema.setMaxDataRegionGroupNum(schema.getMinDataRegionGroupNum());
- return
configManager.getClusterSchemaManager().setDatabase((DatabaseSchemaPlan) plan,
true);
+ return configManager
+ .getClusterSchemaManager()
+ .setDatabase((DatabaseSchemaPlan) plan,
shouldMarkAsPipeRequest.get());
case AlterDatabase:
return configManager
.getClusterSchemaManager()
- .alterDatabase((DatabaseSchemaPlan) plan, true);
+ .alterDatabase((DatabaseSchemaPlan) plan,
shouldMarkAsPipeRequest.get());
case DeleteDatabase:
return configManager.deleteDatabases(
new TDeleteDatabasesReq(
Collections.singletonList(((DeleteDatabasePlan)
plan).getName()))
- .setIsGeneratedByPipe(true)
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get())
.setIsTableModel(
PathUtils.isTableModelDatabase(((DeleteDatabasePlan)
plan).getName())));
case ExtendSchemaTemplate:
return configManager
.getClusterSchemaManager()
- .extendSchemaTemplate(((ExtendSchemaTemplatePlan)
plan).getTemplateExtendInfo(), true);
+ .extendSchemaTemplate(
+ ((ExtendSchemaTemplatePlan) plan).getTemplateExtendInfo(),
+ shouldMarkAsPipeRequest.get());
case CommitSetSchemaTemplate:
return configManager.setSchemaTemplate(
new TSetSchemaTemplateReq(
queryId,
((CommitSetSchemaTemplatePlan) plan).getName(),
((CommitSetSchemaTemplatePlan) plan).getPath())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case PipeUnsetTemplate:
return configManager.unsetSchemaTemplate(
new TUnsetSchemaTemplateReq(
queryId,
((PipeUnsetSchemaTemplatePlan) plan).getName(),
((PipeUnsetSchemaTemplatePlan) plan).getPath())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case PipeDeleteTimeSeries:
return configManager.deleteTimeSeries(
new TDeleteTimeSeriesReq(
queryId, ((PipeDeleteTimeSeriesPlan)
plan).getPatternTreeBytes())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case PipeDeleteLogicalView:
return configManager.deleteLogicalView(
new TDeleteLogicalViewReq(
queryId, ((PipeDeleteLogicalViewPlan)
plan).getPatternTreeBytes())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case PipeDeactivateTemplate:
return configManager
.getProcedureManager()
.deactivateTemplate(
- queryId, ((PipeDeactivateTemplatePlan)
plan).getTemplateSetInfo(), true);
+ queryId,
+ ((PipeDeactivateTemplatePlan) plan).getTemplateSetInfo(),
+ shouldMarkAsPipeRequest.get());
case UpdateTriggerStateInTable:
// TODO: Record complete message in trigger
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -649,8 +655,12 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
.setIsGeneratedByPipe(true));
case SetTTL:
return ((SetTTLPlan) plan).getTTL() == TTLCache.NULL_TTL
- ? configManager.getTTLManager().unsetTTL((SetTTLPlan) plan, true)
- : configManager.getTTLManager().setTTL((SetTTLPlan) plan, true);
+ ? configManager
+ .getTTLManager()
+ .unsetTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get())
+ : configManager
+ .getTTLManager()
+ .setTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get());
case PipeCreateTableOrView:
return executeIdempotentCreateTableOrView((PipeCreateTableOrViewPlan)
plan, queryId);
case AddTableColumn:
@@ -667,7 +677,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((AddTableColumnPlan) plan).getTableName(),
queryId,
((AddTableColumnPlan) plan).getColumnSchemaList(),
- true));
+ shouldMarkAsPipeRequest.get()));
case AddViewColumn:
return configManager
.getProcedureManager()
@@ -682,7 +692,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((AddTableViewColumnPlan) plan).getTableName(),
queryId,
((AddTableViewColumnPlan) plan).getColumnSchemaList(),
- true));
+ shouldMarkAsPipeRequest.get()));
case SetTableProperties:
return configManager
.getProcedureManager()
@@ -697,7 +707,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((SetTablePropertiesPlan) plan).getTableName(),
queryId,
((SetTablePropertiesPlan) plan).getProperties(),
- true));
+ shouldMarkAsPipeRequest.get()));
case SetViewProperties:
return configManager
.getProcedureManager()
@@ -712,7 +722,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((SetViewPropertiesPlan) plan).getTableName(),
queryId,
((SetViewPropertiesPlan) plan).getProperties(),
- true));
+ shouldMarkAsPipeRequest.get()));
case CommitDeleteColumn:
return configManager
.getProcedureManager()
@@ -727,7 +737,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((CommitDeleteColumnPlan) plan).getTableName(),
queryId,
((CommitDeleteColumnPlan) plan).getColumnName(),
- true));
+ shouldMarkAsPipeRequest.get()));
case CommitDeleteViewColumn:
return configManager
.getProcedureManager()
@@ -742,7 +752,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((CommitDeleteViewColumnPlan) plan).getTableName(),
queryId,
((CommitDeleteViewColumnPlan) plan).getColumnName(),
- true));
+ shouldMarkAsPipeRequest.get()));
case RenameTableColumn:
return configManager
.getProcedureManager()
@@ -758,7 +768,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
queryId,
((RenameTableColumnPlan) plan).getOldName(),
((RenameTableColumnPlan) plan).getNewName(),
- true));
+ shouldMarkAsPipeRequest.get()));
case RenameViewColumn:
return configManager
.getProcedureManager()
@@ -774,7 +784,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
queryId,
((RenameViewColumnPlan) plan).getOldName(),
((RenameViewColumnPlan) plan).getNewName(),
- true));
+ shouldMarkAsPipeRequest.get()));
case CommitDeleteTable:
return configManager
.getProcedureManager()
@@ -788,7 +798,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((CommitDeleteTablePlan) plan).getDatabase(),
((CommitDeleteTablePlan) plan).getTableName(),
queryId,
- true));
+ shouldMarkAsPipeRequest.get()));
case CommitDeleteView:
return configManager
.getProcedureManager()
@@ -802,7 +812,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((CommitDeleteViewPlan) plan).getDatabase(),
((CommitDeleteViewPlan) plan).getTableName(),
queryId,
- true));
+ shouldMarkAsPipeRequest.get()));
case SetTableComment:
return configManager
.getClusterSchemaManager()
@@ -811,7 +821,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((SetTableCommentPlan) plan).getTableName(),
((SetTableCommentPlan) plan).getComment(),
false,
- true);
+ shouldMarkAsPipeRequest.get());
case SetViewComment:
return configManager
.getClusterSchemaManager()
@@ -820,7 +830,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((SetViewCommentPlan) plan).getTableName(),
((SetViewCommentPlan) plan).getComment(),
true,
- true);
+ shouldMarkAsPipeRequest.get());
case SetTableColumnComment:
return configManager
.getClusterSchemaManager()
@@ -829,7 +839,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((SetTableColumnCommentPlan) plan).getTableName(),
((SetTableColumnCommentPlan) plan).getColumnName(),
((SetTableColumnCommentPlan) plan).getComment(),
- true);
+ shouldMarkAsPipeRequest.get());
case PipeDeleteDevices:
return configManager
.getProcedureManager()
@@ -841,7 +851,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
ByteBuffer.wrap(((PipeDeleteDevicesPlan)
plan).getPatternBytes()),
ByteBuffer.wrap(((PipeDeleteDevicesPlan)
plan).getFilterBytes()),
ByteBuffer.wrap(((PipeDeleteDevicesPlan)
plan).getModBytes())),
- true)
+ shouldMarkAsPipeRequest.get())
.getStatus();
case RenameTable:
return configManager
@@ -858,7 +868,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((RenameTablePlan) plan).getTableName(),
queryId,
((RenameTablePlan) plan).getNewName(),
- true));
+ shouldMarkAsPipeRequest.get()));
case RenameView:
return configManager
.getProcedureManager()
@@ -873,7 +883,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
((RenameViewPlan) plan).getTableName(),
queryId,
((RenameViewPlan) plan).getNewName(),
- true));
+ shouldMarkAsPipeRequest.get()));
case CreateUser:
case CreateUserWithRawPassword:
case CreateRole:
@@ -917,10 +927,14 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
case RRevokeUserSysPri:
case RGrantUserRole:
case RRevokeUserRole:
- return
configManager.getPermissionManager().operatePermission((AuthorPlan) plan, true);
+ return configManager
+ .getPermissionManager()
+ .operatePermission((AuthorPlan) plan,
shouldMarkAsPipeRequest.get());
case CreateSchemaTemplate:
default:
- return configManager.getConsensusManager().write(new
PipeEnrichedPlan(plan));
+ return configManager
+ .getConsensusManager()
+ .write(shouldMarkAsPipeRequest.get() ? new PipeEnrichedPlan(plan)
: plan);
}
}