This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch to--207
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/to--207 by this push:
new a15eaea748d Pipe: Improved the total performance by disable some
useless logic
a15eaea748d is described below
commit a15eaea748dcb5ae2051fd950ebbb8f02d2e2307
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 15 14:21:01 2025 +0800
Pipe: Improved the total performance by disable some useless logic
---
.github/workflows/daily-it.yml | 994 +--------------------
.github/workflows/pipe-it.yml | 978 --------------------
.../manual/basic/IoTDBPipeProtocolIT.java | 16 +-
.../treemodel/auto/basic/IoTDBPipeProtocolIT.java | 16 +-
.../plugin/PipeConfigRegionSinkConstructor.java | 6 -
.../PipeDataRegionProcessorConstructor.java | 30 -
.../dataregion/PipeDataRegionSinkConstructor.java | 12 -
.../PipeSchemaRegionSinkConstructor.java | 6 -
.../source/dataregion/IoTDBDataRegionSource.java | 49 +-
.../realtime/PipeRealtimeDataRegionSource.java | 19 +-
.../agent/plugin/builtin/BuiltinPipePlugin.java | 40 -
.../iotdb/commons/pipe/source/IoTDBSource.java | 19 +-
12 files changed, 30 insertions(+), 2155 deletions(-)
diff --git a/.github/workflows/daily-it.yml b/.github/workflows/daily-it.yml
index 5f7d8cdcfd6..08889c8685c 100644
--- a/.github/workflows/daily-it.yml
+++ b/.github/workflows/daily-it.yml
@@ -84,996 +84,4 @@ jobs:
with:
name: table-standalone-log-java${{ matrix.java }}-${{ runner.os }}
path: integration-test/target/cluster-logs
- retention-days: 3
- PipeSingle:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- cluster2:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- ]
- os: [ubuntu-latest]
- exclude:
- - cluster1: LightWeightStandaloneMode
- cluster2: LightWeightStandaloneMode
- - cluster1: LightWeightStandaloneMode
- cluster2: ScalableSingleNodeMode
- - cluster1: ScalableSingleNodeMode
- cluster2: LightWeightStandaloneMode
- - cluster1: ScalableSingleNodeMode
- cluster2: HighPerformanceMode
- - cluster1: HighPerformanceMode
- cluster2: LightWeightStandaloneMode
- - cluster1: HighPerformanceMode
- cluster2: HighPerformanceMode
- - cluster1: PipeConsensusBatchMode
- cluster2: LightWeightStandaloneMode
- - cluster1: PipeConsensusBatchMode
- cluster2: HighPerformanceMode
- - cluster1: PipeConsensusStreamMode
- cluster2: LightWeightStandaloneMode
- - cluster1: PipeConsensusStreamMode
- cluster2: HighPerformanceMode
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT1 \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{
matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- PipeDualTreeAutoBasic:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster
}} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTreeAutoBasic \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{
runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- PipeDualTreeAutoEnhanced:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- cluster2:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- ]
- os: [ubuntu-latest]
- exclude:
- - cluster1: LightWeightStandaloneMode
- cluster2: LightWeightStandaloneMode
- - cluster1: LightWeightStandaloneMode
- cluster2: ScalableSingleNodeMode
- - cluster1: ScalableSingleNodeMode
- cluster2: LightWeightStandaloneMode
- - cluster1: ScalableSingleNodeMode
- cluster2: HighPerformanceMode
- - cluster1: HighPerformanceMode
- cluster2: LightWeightStandaloneMode
- - cluster1: HighPerformanceMode
- cluster2: HighPerformanceMode
- - cluster1: PipeConsensusBatchMode
- cluster2: LightWeightStandaloneMode
- - cluster1: PipeConsensusBatchMode
- cluster2: HighPerformanceMode
- - cluster1: PipeConsensusStreamMode
- cluster2: LightWeightStandaloneMode
- - cluster1: PipeConsensusStreamMode
- cluster2: HighPerformanceMode
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTreeAutoEnhanced \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{
runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- PipeDualTreeManual:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- cluster2:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- ]
- os: [ubuntu-latest]
- exclude:
- - cluster1: LightWeightStandaloneMode
- cluster2: LightWeightStandaloneMode
- - cluster1: LightWeightStandaloneMode
- cluster2: ScalableSingleNodeMode
- - cluster1: ScalableSingleNodeMode
- cluster2: LightWeightStandaloneMode
- - cluster1: ScalableSingleNodeMode
- cluster2: HighPerformanceMode
- - cluster1: HighPerformanceMode
- cluster2: LightWeightStandaloneMode
- - cluster1: HighPerformanceMode
- cluster2: HighPerformanceMode
- - cluster1: PipeConsensusBatchMode
- cluster2: LightWeightStandaloneMode
- - cluster1: PipeConsensusBatchMode
- cluster2: HighPerformanceMode
- - cluster1: PipeConsensusStreamMode
- cluster2: LightWeightStandaloneMode
- - cluster1: PipeConsensusStreamMode
- cluster2: HighPerformanceMode
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTreeManual \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{
runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- SubscriptionTreeArchVerification:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1:
- [
- ScalableSingleNodeMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- cluster2: [ScalableSingleNodeMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2SubscriptionTreeArchVerification \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-subscription-tree-arch-verification-java${{
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- SubscriptionTableArchVerification:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1: [ScalableSingleNodeMode]
- cluster2: [ScalableSingleNodeMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2SubscriptionTableArchVerification \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-subscription-table-arch-verification-java${{
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- SubscriptionTreeRegressionConsumer:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # do not use HighPerformanceMode here, otherwise some tests will cause
the GH runner to receive a shutdown signal
- cluster1:
- [
- ScalableSingleNodeMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- cluster2: [ScalableSingleNodeMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2SubscriptionTreeRegressionConsumer \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-subscription-tree-regression-consumer-java${{
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- SubscriptionTreeRegressionMisc:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # do not use HighPerformanceMode here, otherwise some tests will cause
the GH runner to receive a shutdown signal
- cluster1:
- [
- ScalableSingleNodeMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- cluster2: [ScalableSingleNodeMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2SubscriptionTreeRegressionMisc \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-subscription-tree-regression-misc-java${{
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- PipeDualTableManualBasic:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster
}} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTableManualBasic \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{
runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- PipeDualTableManualEnhanced:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster:
- [
- LightWeightStandaloneMode,
- ScalableSingleNodeMode,
- HighPerformanceMode,
- PipeConsensusBatchMode,
- PipeConsensusStreamMode,
- ]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster
}} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTableManualEnhanced \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java
}}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
- path: integration-test/target/cluster-logs
- retention-days: 30
+ retention-days: 3
\ No newline at end of file
diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml
deleted file mode 100644
index b1c2b75fa9f..00000000000
--- a/.github/workflows/pipe-it.yml
+++ /dev/null
@@ -1,978 +0,0 @@
-name: Multi-Cluster IT
-
-on:
- push:
- branches:
- - master
- - "rel/*"
- - "rc/*"
- paths-ignore:
- - "docs/**"
- - "site/**"
- - "iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**"
#queryengine
- pull_request:
- branches:
- - master
- - "rel/*"
- - "rc/*"
- - "force_ci/**"
- paths-ignore:
- - "docs/**"
- - "site/**"
- - "iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**"
#queryengine
- # allow manually run the action:
- workflow_dispatch:
-
-concurrency:
- group: ${{ github.workflow }}-${{ github.ref }}
- cancel-in-progress: true
-
-env:
- MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
-Dmaven.wagon.http.retryHandler.class=standard
-Dmaven.wagon.http.retryHandler.count=3
- MAVEN_ARGS: --batch-mode --no-transfer-progress
- DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
-
-jobs:
- single:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1: [HighPerformanceMode]
- cluster2: [HighPerformanceMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT1 \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{
matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- dual-tree-auto-basic:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster: [HighPerformanceMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster
}} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTreeAutoBasic \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{
runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- dual-tree-auto-enhanced:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1: [HighPerformanceMode]
- cluster2: [HighPerformanceMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTreeAutoEnhanced \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{
runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- dual-tree-manual:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1: [HighPerformanceMode]
- cluster2: [HighPerformanceMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTreeManual \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{
runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- subscription-tree-arch-verification:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1: [ScalableSingleNodeMode]
- cluster2: [ScalableSingleNodeMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2SubscriptionTreeArchVerification \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-subscription-tree-arch-verification-java${{
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- subscription-table-arch-verification:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster1: [ScalableSingleNodeMode]
- cluster2: [ScalableSingleNodeMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2SubscriptionTableArchVerification \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-subscription-table-arch-verification-java${{
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- subscription-tree-regression-consumer:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # do not use HighPerformanceMode here, otherwise some tests will cause
the GH runner to receive a shutdown signal
- cluster1: [ScalableSingleNodeMode]
- cluster2: [ScalableSingleNodeMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2SubscriptionTreeRegressionConsumer \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-subscription-tree-regression-consumer-java${{
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- subscription-tree-regression-misc:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # do not use HighPerformanceMode here, otherwise some tests will cause
the GH runner to receive a shutdown signal
- cluster1: [ScalableSingleNodeMode]
- cluster2: [ScalableSingleNodeMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
- -pl integration-test \
- -am -PMultiClusterIT2SubscriptionTreeRegressionMisc \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-subscription-tree-regression-misc-java${{
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- dual-table-manual-basic:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster: [HighPerformanceMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster
}} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTableManualBasic \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{
runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- dual-table-manual-enhanced:
- strategy:
- fail-fast: false
- max-parallel: 15
- matrix:
- java: [17]
- # StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
- cluster: [HighPerformanceMode]
- os: [ubuntu-latest]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster
}} \
- -pl integration-test \
- -am -PMultiClusterIT2DualTableManualEnhanced \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java
}}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
- path: integration-test/target/cluster-logs
- retention-days: 30
- triple:
- strategy:
- fail-fast: false
- max-parallel: 1
- matrix:
- java: [ 17 ]
- cluster1: [ ScalableSingleNodeMode ]
- cluster2: [ ScalableSingleNodeMode ]
- cluster3: [ ScalableSingleNodeMode ]
- os: [ ubuntu-latest ]
- runs-on: ${{ matrix.os }}
- steps:
- - uses: actions/checkout@v4
- - name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v4
- with:
- distribution: corretto
- java-version: ${{ matrix.java }}
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Cache Maven packages
- uses: actions/cache@v4
- with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2-
- - name: Sleep for a random duration between 0 and 10000 milliseconds
- run: |
- sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- - name: IT Test
- shell: bash
- # we do not compile client-cpp for saving time, it is tested in
client.yml
- # we can skip influxdb-protocol because it has been tested separately
in influxdb-protocol.yml
- run: |
- retry() {
- local -i max_attempts=3
- local -i attempt=1
- local -i retry_sleep=5
- local test_output
-
- while [ $attempt -le $max_attempts ]; do
- mvn clean verify \
- -P with-integration-tests \
- -DskipUTs \
- -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
- -DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }},${{ matrix.cluster3 }} \
- -pl integration-test \
- -am -PMultiClusterIT3 \
- -ntp >> ~/run-tests-$attempt.log && return 0
- test_output=$(cat ~/run-tests-$attempt.log)
-
- echo "==================== BEGIN: ~/run-tests-$attempt.log
===================="
- echo "$test_output"
- echo "==================== END: ~/run-tests-$attempt.log
======================"
-
- if ! mv ~/run-tests-$attempt.log
integration-test/target/cluster-logs/ 2>/dev/null; then
- echo "Failed to move log file ~/run-tests-$attempt.log to
integration-test/target/cluster-logs/. Skipping..."
- fi
-
- if echo "$test_output" | grep -q "Could not transfer artifact";
then
- if [ $attempt -lt $max_attempts ]; then
- echo "Test failed with artifact transfer issue, attempt
$attempt. Retrying in $retry_sleep seconds..."
- sleep $retry_sleep
- attempt=$((attempt + 1))
- else
- echo "Test failed after $max_attempts attempts due to
artifact transfer issue."
- echo "Treating this as a success because the issue is likely
transient."
- return 0
- fi
- elif [ $? -ne 0 ]; then
- echo "Test failed with a different error."
- return 1
- else
- echo "Tests passed"
- return 0
- fi
- done
- }
- retry
- - name: Upload Artifact
- if: failure()
- uses: actions/upload-artifact@v4
- with:
- name: cluster-log-triple-java${{ matrix.java }}-${{ runner.os }}-${{
matrix.cluster1 }}-${{ matrix.cluster2 }}-${{ matrix.cluster3 }}
- path: integration-test/target/cluster-logs
- retention-days: 30
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
index 3fce11bf51f..8e8c61dca24 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
@@ -351,11 +351,6 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
}
- @Test
- public void testAirGapConnectorUseNodeUrls() throws Exception {
-
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
- }
-
private void doTestUseNodeUrls(String connectorName) throws Exception {
senderEnv
.getConfig()
@@ -398,16 +393,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
};
for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList())
{
- if
(connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()))
{
- // Use default port for convenience
- nodeUrlsBuilder
- .append(wrapper.getIp())
- .append(":")
- .append(wrapper.getPipeAirGapReceiverPort())
- .append(",");
- } else {
- nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
- }
+ nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
}
try (final SyncConfigNodeIServiceClient client =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
index 0d6091c51ec..6c1efc88515 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
@@ -339,11 +339,6 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
}
- @Test
- public void testAirGapConnectorUseNodeUrls() throws Exception {
-
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
- }
-
private void doTestUseNodeUrls(String connectorName) throws Exception {
senderEnv
.getConfig()
@@ -378,16 +373,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
final StringBuilder nodeUrlsBuilder = new StringBuilder();
for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList())
{
- if
(connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()))
{
- // Use default port for convenience
- nodeUrlsBuilder
- .append(wrapper.getIp())
- .append(":")
- .append(wrapper.getPipeAirGapReceiverPort())
- .append(",");
- } else {
- nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
- }
+ nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
}
try (final SyncConfigNodeIServiceClient client =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java
index c7222e9a1bc..82c394c7fb5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
-import
org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionAirGapSink;
import
org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionSink;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -41,9 +40,6 @@ class PipeConfigRegionSinkConstructor extends
PipeSinkConstructor {
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
IoTDBConfigRegionSink::new);
- pluginConstructors.put(
- BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
- IoTDBConfigRegionAirGapSink::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(),
DoNothingSink::new);
@@ -55,8 +51,6 @@ class PipeConfigRegionSinkConstructor extends
PipeSinkConstructor {
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(),
IoTDBConfigRegionSink::new);
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(),
IoTDBConfigRegionSink::new);
- pluginConstructors.put(
- BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(),
IoTDBConfigRegionAirGapSink::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(),
DoNothingSink::new);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 31cc8250ebf..33143bdc27e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -21,18 +21,10 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor;
import
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
-import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor;
-import
org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor;
-import
org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
-import
org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor;
-import
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
-import
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor;
-import
org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
@@ -44,28 +36,6 @@ class PipeDataRegionProcessorConstructor extends
PipeProcessorConstructor {
protected void initConstructors() {
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(),
DoNothingProcessor::new);
- pluginConstructors.put(
- BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(),
- TumblingTimeSamplingProcessor::new);
- pluginConstructors.put(
- BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
- SwingingDoorTrendingSamplingProcessor::new);
- pluginConstructors.put(
-
BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(),
- ChangingValueSamplingProcessor::new);
- pluginConstructors.put(
- BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
- ThrowingExceptionProcessor::new);
- pluginConstructors.put(
- BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(),
AggregateProcessor::new);
- pluginConstructors.put(
- BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(),
- StandardStatisticsOperatorProcessor::new);
- pluginConstructors.put(
- BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
- TumblingWindowingProcessor::new);
- pluginConstructors.put(
- BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(),
TwoStageCountProcessor::new);
pluginConstructors.put(
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
PipeConsensusProcessor::new);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
index 536cf71cdb8..09773d0cad5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
@@ -23,10 +23,7 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
import
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
-import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink;
import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
-import org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaSink;
-import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
import
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
@@ -59,13 +56,8 @@ class PipeDataRegionSinkConstructor extends
PipeSinkConstructor {
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
IoTDBLegacyPipeSink::new);
- pluginConstructors.put(
- BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
- IoTDBDataRegionAirGapSink::new);
pluginConstructors.put(
BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(),
WebSocketSink::new);
-
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
OpcUaSink::new);
-
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(),
OpcDaSink::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(),
DoNothingSink::new);
pluginConstructors.put(
@@ -82,12 +74,8 @@ class PipeDataRegionSinkConstructor extends
PipeSinkConstructor {
IoTDBDataRegionAsyncSink::new);
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(),
IoTDBLegacyPipeSink::new);
- pluginConstructors.put(
- BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(),
IoTDBDataRegionAirGapSink::new);
pluginConstructors.put(
BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(),
WebSocketSink::new);
- pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(),
OpcUaSink::new);
- pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(),
OpcDaSink::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(),
DoNothingSink::new);
pluginConstructors.put(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java
index 160ecf54c01..a7752994ee2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.agent.plugin.schemaregion;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
-import
org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -41,9 +40,6 @@ class PipeSchemaRegionSinkConstructor extends
PipeSinkConstructor {
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
IoTDBSchemaRegionSink::new);
- pluginConstructors.put(
- BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
- IoTDBSchemaRegionAirGapSink::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(),
DoNothingSink::new);
@@ -55,8 +51,6 @@ class PipeSchemaRegionSinkConstructor extends
PipeSinkConstructor {
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(),
IoTDBSchemaRegionSink::new);
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(),
IoTDBSchemaRegionSink::new);
- pluginConstructors.put(
- BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(),
IoTDBSchemaRegionAirGapSink::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(),
DoNothingSink::new);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 9d2b629a67d..c862d7c2244 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.source.dataregion;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
@@ -141,6 +142,21 @@ public class IoTDBDataRegionSource extends IoTDBSource {
public void validate(final PipeParameterValidator validator) throws
Exception {
super.validate(validator);
+ final boolean forwardingPipeRequests =
+ validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(
+ PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ if (!forwardingPipeRequests) {
+ throw new PipeParameterNotValidException(
+ String.format(
+ "The parameter %s cannot be set to false.",
+ PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY));
+ }
+
final boolean isTreeDialect =
validator
.getParameters()
@@ -262,32 +278,6 @@ public class IoTDBDataRegionSource extends IoTDBSource {
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
- // Validate extractor.realtime.mode
- if (validator
- .getParameters()
- .getBooleanOrDefault(
- Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
- EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
- || validator
- .getParameters()
- .hasAnyAttributes(
- SOURCE_START_TIME_KEY,
- EXTRACTOR_START_TIME_KEY,
- SOURCE_END_TIME_KEY,
- EXTRACTOR_END_TIME_KEY)) {
- validator.validateAttributeValueRange(
- validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
- ? EXTRACTOR_REALTIME_MODE_KEY
- : SOURCE_REALTIME_MODE_KEY,
- true,
- EXTRACTOR_REALTIME_MODE_FILE_VALUE,
- EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
- EXTRACTOR_REALTIME_MODE_LOG_VALUE,
- EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
- EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
- EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
- }
-
checkInvalidParameters(validator);
constructHistoricalExtractor();
@@ -450,6 +440,13 @@ public class IoTDBDataRegionSource extends IoTDBSource {
return;
}
+ if (!(pipeName != null
+ && (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX)
+ || pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) {
+ realtimeExtractor = new PipeRealtimeDataRegionTsFileSource();
+ return;
+ }
+
// Use hybrid mode by default
if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY,
SOURCE_MODE_STREAMING_KEY)
&& !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 08cc91b232d..665bb299d2f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -115,7 +115,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
private final AtomicReference<Pair<Long, Long>>
dataRegionTimePartitionIdBound =
new AtomicReference<>();
- protected boolean isForwardingPipeRequests;
+ protected boolean isForwardingPipeRequests = true;
private boolean shouldTransferModFile; // Whether to transfer mods
@@ -251,22 +251,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
?
TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime)
:
TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1;
- final boolean isDoubleLiving =
- parameters.getBooleanOrDefault(
- Arrays.asList(
- PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
- PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
- PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
- if (isDoubleLiving) {
- isForwardingPipeRequests = false;
- } else {
- isForwardingPipeRequests =
- parameters.getBooleanOrDefault(
- Arrays.asList(
- PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
- PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
- }
+ isForwardingPipeRequests = true;
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
shouldTransferModFile =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 74bc0d9815a..f9f9c0fb6e3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -19,27 +19,16 @@
package org.apache.iotdb.commons.pipe.agent.plugin.builtin;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange.RenameDatabaseProcessor;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.airgap.IoTDBAirGapSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.consensus.PipeConsensusAsyncSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBLegacyPipeSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftAsyncSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSslSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSyncSink;
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcda.OpcDaSink;
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcua.OpcUaSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.websocket.WebSocketSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.writeback.WriteBackSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource;
@@ -61,18 +50,8 @@ public enum BuiltinPipePlugin {
// processors
DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
- TUMBLING_TIME_SAMPLING_PROCESSOR(
- "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
- SDT_SAMPLING_PROCESSOR("sdt-sampling-processor",
SwingingDoorTrendingSamplingProcessor.class),
- CHANGING_VALUE_SAMPLING_PROCESSOR(
- "changing-value-sampling-processor",
ChangingValueSamplingProcessor.class),
- THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor",
ThrowingExceptionProcessor.class),
- AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class),
- COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class),
// Hidden-processors, which are plugins of the processors
- STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor",
StandardStatisticsProcessor.class),
- TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor",
TumblingWindowingProcessor.class),
PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor",
PipeConsensusProcessor.class),
RENAME_DATABASE_PROCESSOR("rename-database-processor",
RenameDatabaseProcessor.class),
@@ -83,12 +62,9 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector",
IoTDBThriftSyncSink.class),
IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector",
IoTDBThriftAsyncSink.class),
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector",
IoTDBLegacyPipeSink.class),
- IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class),
PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector",
PipeConsensusAsyncSink.class),
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class),
- OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class),
- OPC_DA_CONNECTOR("opc-da-connector", OpcDaSink.class),
WRITE_BACK_CONNECTOR("write-back-connector", WriteBackSink.class),
DO_NOTHING_SINK("do-nothing-sink", DoNothingSink.class),
@@ -97,10 +73,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncSink.class),
IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink",
IoTDBThriftAsyncSink.class),
IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeSink.class),
- IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapSink.class),
WEBSOCKET_SINK("websocket-sink", WebSocketSink.class),
- OPC_UA_SINK("opc-ua-sink", OpcUaSink.class),
- OPC_DA_SINK("opc-da-sink", OpcDaSink.class),
WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink",
PipeConsensusAsyncSink.class),
@@ -148,14 +121,6 @@ public enum BuiltinPipePlugin {
// Sources
DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(),
// Processors
-
TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
- SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
-
CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
-
THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(),
- AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(),
- COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(),
-
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
-
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
// Connectors
@@ -165,10 +130,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
- IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
- OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
- OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
// Sinks
@@ -176,8 +138,6 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
- OPC_UA_SINK.getPipePluginName().toUpperCase(),
- OPC_DA_SINK.getPipePluginName().toUpperCase(),
SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
index 6640fde7f4a..d98f7572924 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
@@ -62,7 +62,7 @@ public abstract class IoTDBSource implements PipeExtractor {
protected int regionId;
protected PipeTaskMeta pipeTaskMeta;
- protected boolean isForwardingPipeRequests;
+ protected boolean isForwardingPipeRequests = true;
// The value is always true after the first start even the extractor is
closed
protected final AtomicBoolean hasBeenStarted = new AtomicBoolean(false);
@@ -161,22 +161,7 @@ public abstract class IoTDBSource implements PipeExtractor
{
taskID = pipeName + "_" + regionId + "_" + creationTime;
pipeTaskMeta = environment.getPipeTaskMeta();
- final boolean isDoubleLiving =
- parameters.getBooleanOrDefault(
- Arrays.asList(
- PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
- PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
- PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
- if (isDoubleLiving) {
- isForwardingPipeRequests = false;
- } else {
- isForwardingPipeRequests =
- parameters.getBooleanOrDefault(
- Arrays.asList(
- PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
- PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
- }
+ isForwardingPipeRequests = true;
userId =
parameters.getStringOrDefault(