This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.6 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 09d526ced1ad675308506b623ca24cc7c771f536 Author: Caideyipi <[email protected]> AuthorDate: Fri Aug 15 14:21:01 2025 +0800 Pipe: Improved the total performance by disable some useless logic --- .github/workflows/daily-it.yml | 900 --------------------- .github/workflows/pipe-it.yml | 872 -------------------- .../manual/basic/IoTDBPipeProtocolIT.java | 16 +- .../treemodel/auto/basic/IoTDBPipeProtocolIT.java | 16 +- .../plugin/PipeConfigRegionSinkConstructor.java | 6 - .../PipeDataRegionProcessorConstructor.java | 30 - .../dataregion/PipeDataRegionSinkConstructor.java | 12 - .../PipeSchemaRegionSinkConstructor.java | 6 - .../source/dataregion/IoTDBDataRegionSource.java | 49 +- .../realtime/PipeRealtimeDataRegionSource.java | 19 +- .../agent/plugin/builtin/BuiltinPipePlugin.java | 40 - .../iotdb/commons/pipe/source/IoTDBSource.java | 19 +- 12 files changed, 29 insertions(+), 1956 deletions(-) diff --git a/.github/workflows/daily-it.yml b/.github/workflows/daily-it.yml index 8f28c54e56a..1ab0d3bb857 100644 --- a/.github/workflows/daily-it.yml +++ b/.github/workflows/daily-it.yml @@ -81,903 +81,3 @@ jobs: name: table-standalone-log-java${{ matrix.java }}-${{ runner.os }} path: integration-test/target/cluster-logs retention-days: 3 - PipeSingle: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT1 \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTreeAutoBasic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTreeAutoEnhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTreeManual: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeManual \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - SubscriptionTreeArchVerification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-arch-verification-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - SubscriptionTableArchVerification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTableArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-table-arch-verification-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - SubscriptionTreeRegressionConsumer: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeRegressionConsumer \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-regression-consumer-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - SubscriptionTreeRegressionMisc: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeRegressionMisc \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTableManualBasic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - PipeDualTableManualEnhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 \ No newline at end of file diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml deleted file mode 100644 index 97e6fad1903..00000000000 --- a/.github/workflows/pipe-it.yml +++ /dev/null @@ -1,872 +0,0 @@ -name: Multi-Cluster IT - -on: - push: - branches: - - master - - 'rel/*' - - 'rc/*' - paths-ignore: - - 'docs/**' - - 'site/**' - - 'iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**' #queryengine - pull_request: - branches: - - master - - 'rel/*' - - 'rc/*' - - 'force_ci/**' - paths-ignore: - - 'docs/**' - - 'site/**' - - 'iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**' #queryengine - # allow manually run the action: - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - MAVEN_ARGS: --batch-mode --no-transfer-progress - DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} - -jobs: - single: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [HighPerformanceMode] - cluster2: [HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT1 \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-auto-basic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-auto-enhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [HighPerformanceMode] - cluster2: [HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-manual: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [HighPerformanceMode] - cluster2: [HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeManual \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-tree-arch-verification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-arch-verification-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-table-arch-verification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTableArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-table-arch-verification-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-tree-regression-consumer: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeRegressionConsumer \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-regression-consumer-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-tree-regression-misc: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionTreeRegressionMisc \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-tree-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-table-manual-basic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-table-manual-enhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 \ No newline at end of file diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java index 939e84b9f49..869eebeefbc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java @@ -365,11 +365,6 @@ public class IoTDBPipeProtocolIT extends AbstractPipeTableModelDualManualIT { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); } - @Test - public void testAirGapConnectorUseNodeUrls() throws Exception { - doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()); - } - private void doTestUseNodeUrls(String connectorName) throws Exception { senderEnv .getConfig() @@ -413,16 +408,7 @@ public class IoTDBPipeProtocolIT extends AbstractPipeTableModelDualManualIT { boolean insertResult = true; for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) { - if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) { - // Use default port for convenience - nodeUrlsBuilder - .append(wrapper.getIp()) - .append(":") - .append(wrapper.getPipeAirGapReceiverPort()) - .append(","); - } else { - nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); - } + nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); } try (final SyncConfigNodeIServiceClient client = diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java index 1249bf90e78..9c81fe049a6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java @@ -352,11 +352,6 @@ public class IoTDBPipeProtocolIT extends AbstractPipeDualTreeModelAutoIT { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); } - @Test - public void testAirGapConnectorUseNodeUrls() throws Exception { - doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()); - } - private void doTestUseNodeUrls(String connectorName) throws Exception { senderEnv .getConfig() @@ -391,16 +386,7 @@ public class IoTDBPipeProtocolIT extends AbstractPipeDualTreeModelAutoIT { final StringBuilder nodeUrlsBuilder = new StringBuilder(); for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) { - if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) { - // Use default port for convenience - nodeUrlsBuilder - .append(wrapper.getIp()) - .append(":") - .append(wrapper.getPipeAirGapReceiverPort()) - .append(","); - } else { - nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); - } + nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); } try (final SyncConfigNodeIServiceClient client = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java index c7222e9a1bc..82c394c7fb5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java @@ -22,7 +22,6 @@ package org.apache.iotdb.confignode.manager.pipe.agent.plugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; -import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionAirGapSink; import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionSink; import org.apache.iotdb.pipe.api.PipeConnector; @@ -41,9 +40,6 @@ class PipeConfigRegionSinkConstructor extends PipeSinkConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBConfigRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBConfigRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); @@ -55,8 +51,6 @@ class PipeConfigRegionSinkConstructor extends PipeSinkConstructor { BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBConfigRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 31cc8250ebf..33143bdc27e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -21,18 +21,10 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor; -import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor; class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { @@ -44,28 +36,6 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), DoNothingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(), - TumblingTimeSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(), - SwingingDoorTrendingSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(), - ChangingValueSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), - ThrowingExceptionProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), AggregateProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(), - StandardStatisticsOperatorProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(), - TumblingWindowingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new); pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(), PipeConsensusProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java index 536cf71cdb8..09773d0cad5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java @@ -23,10 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink; import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; -import org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaSink; -import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; @@ -59,13 +56,8 @@ class PipeDataRegionSinkConstructor extends PipeSinkConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), IoTDBLegacyPipeSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBDataRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); pluginConstructors.put( @@ -82,12 +74,8 @@ class PipeDataRegionSinkConstructor extends PipeSinkConstructor { IoTDBDataRegionAsyncSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), IoTDBLegacyPipeSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBDataRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); pluginConstructors.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java index 160ecf54c01..a7752994ee2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.agent.plugin.schemaregion; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; -import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink; import org.apache.iotdb.pipe.api.PipeConnector; @@ -41,9 +40,6 @@ class PipeSchemaRegionSinkConstructor extends PipeSinkConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBSchemaRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBSchemaRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); @@ -55,8 +51,6 @@ class PipeSchemaRegionSinkConstructor extends PipeSinkConstructor { BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBSchemaRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index 4dc5fef859f..a310785cdc3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.source.dataregion; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; @@ -141,6 +142,21 @@ public class IoTDBDataRegionSource extends IoTDBSource { public void validate(final PipeParameterValidator validator) throws Exception { super.validate(validator); + final boolean forwardingPipeRequests = + validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, + PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), + PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + if (!forwardingPipeRequests) { + throw new PipeParameterNotValidException( + String.format( + "The parameter %s cannot be set to false.", + PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY)); + } + final boolean isTreeDialect = validator .getParameters() @@ -262,32 +278,6 @@ public class IoTDBDataRegionSource extends IoTDBSource { Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)); - // Validate extractor.realtime.mode - if (validator - .getParameters() - .getBooleanOrDefault( - Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), - EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE) - || validator - .getParameters() - .hasAnyAttributes( - SOURCE_START_TIME_KEY, - EXTRACTOR_START_TIME_KEY, - SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY)) { - validator.validateAttributeValueRange( - validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY) - ? EXTRACTOR_REALTIME_MODE_KEY - : SOURCE_REALTIME_MODE_KEY, - true, - EXTRACTOR_REALTIME_MODE_FILE_VALUE, - EXTRACTOR_REALTIME_MODE_HYBRID_VALUE, - EXTRACTOR_REALTIME_MODE_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE, - EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE); - } - checkInvalidParameters(validator); constructHistoricalExtractor(); @@ -447,6 +437,13 @@ public class IoTDBDataRegionSource extends IoTDBSource { return; } + if (!(pipeName != null + && (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX) + || pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) { + realtimeExtractor = new PipeRealtimeDataRegionTsFileSource(); + return; + } + // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 8f0874330cf..076021dde3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -114,7 +114,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { private final AtomicReference<Pair<Long, Long>> dataRegionTimePartitionIdBound = new AtomicReference<>(); - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; private boolean shouldTransferModFile; // Whether to transfer mods @@ -246,22 +246,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { ? TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) : TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1; - final boolean isDoubleLiving = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY, - PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY), - PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE); - if (isDoubleLiving) { - isForwardingPipeRequests = false; - } else { - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - } + isForwardingPipeRequests = true; if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { shouldTransferModFile = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 8da1e50a4d5..daab48bbe23 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -19,27 +19,16 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange.RenameDatabaseProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.airgap.IoTDBAirGapSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.consensus.PipeConsensusAsyncSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBLegacyPipeSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftAsyncSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSslSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSyncSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcda.OpcDaSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcua.OpcUaSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.websocket.WebSocketSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.writeback.WriteBackSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource; @@ -64,18 +53,8 @@ public enum BuiltinPipePlugin { // processors DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class), - TUMBLING_TIME_SAMPLING_PROCESSOR( - "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class), - SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class), - CHANGING_VALUE_SAMPLING_PROCESSOR( - "changing-value-sampling-processor", ChangingValueSamplingProcessor.class), - THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class), - AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class), - COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class), // Hidden-processors, which are plugins of the processors - STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), - TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", PipeConsensusProcessor.class), RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class), @@ -86,12 +65,9 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", IoTDBThriftSyncSink.class), IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncSink.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeSink.class), - IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class), PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", PipeConsensusAsyncSink.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class), - OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class), - OPC_DA_CONNECTOR("opc-da-connector", OpcDaSink.class), WRITE_BACK_CONNECTOR("write-back-connector", WriteBackSink.class), DO_NOTHING_SINK("do-nothing-sink", DoNothingSink.class), @@ -100,10 +76,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncSink.class), IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", IoTDBThriftAsyncSink.class), IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeSink.class), - IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapSink.class), WEBSOCKET_SINK("websocket-sink", WebSocketSink.class), - OPC_UA_SINK("opc-ua-sink", OpcUaSink.class), - OPC_DA_SINK("opc-da-sink", OpcDaSink.class), WRITE_BACK_SINK("write-back-sink", WriteBackSink.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class), PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncSink.class), @@ -151,14 +124,6 @@ public enum BuiltinPipePlugin { // Sources DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(), // Processors - TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(), - AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(), - COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(), - STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(), - TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(), RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(), // Connectors @@ -168,10 +133,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(), WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(), WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), // Sinks @@ -179,8 +141,6 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(), WEBSOCKET_SINK.getPipePluginName().toUpperCase(), - OPC_UA_SINK.getPipePluginName().toUpperCase(), - OPC_DA_SINK.getPipePluginName().toUpperCase(), SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase()))); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java index c16f2310629..fcb6a5e1238 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java @@ -61,7 +61,7 @@ public abstract class IoTDBSource implements PipeExtractor { protected int regionId; protected PipeTaskMeta pipeTaskMeta; - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; // The value is always true after the first start even the extractor is closed protected final AtomicBoolean hasBeenStarted = new AtomicBoolean(false); @@ -157,22 +157,7 @@ public abstract class IoTDBSource implements PipeExtractor { taskID = pipeName + "_" + regionId + "_" + creationTime; pipeTaskMeta = environment.getPipeTaskMeta(); - final boolean isDoubleLiving = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY, - PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY), - PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE); - if (isDoubleLiving) { - isForwardingPipeRequests = false; - } else { - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - } + isForwardingPipeRequests = true; userName = parameters.getStringByKeys(
