This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rc/2.0.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 61c61ec59d79fc47789dfc211524affcd7810c7b Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Apr 14 16:23:22 2025 +0800 Pipe: Disable unstable features in the distribution --- .github/workflows/pipe-it.yml | 851 --------------------- .../PipeConfigRegionConnectorConstructor.java | 7 - .../PipeDataRegionConnectorConstructor.java | 15 - .../PipeDataRegionProcessorConstructor.java | 30 - .../PipeSchemaRegionConnectorConstructor.java | 7 - .../dataregion/IoTDBDataRegionExtractor.java | 21 + .../realtime/PipeRealtimeDataRegionExtractor.java | 19 +- .../agent/plugin/builtin/BuiltinPipePlugin.java | 40 - .../commons/pipe/extractor/IoTDBExtractor.java | 19 +- 9 files changed, 25 insertions(+), 984 deletions(-) diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml deleted file mode 100644 index 2036976a637..00000000000 --- a/.github/workflows/pipe-it.yml +++ /dev/null @@ -1,851 +0,0 @@ -name: Multi-Cluster IT - -on: - push: - branches: - - master - - 'rel/*' - - 'rc/*' - paths-ignore: - - 'docs/**' - - 'site/**' - - 'iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**' #queryengine - pull_request: - branches: - - master - - 'rel/*' - - 'rc/*' - - 'force_ci/**' - paths-ignore: - - 'docs/**' - - 'site/**' - - 'iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**' #queryengine - # allow manually run the action: - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - MAVEN_ARGS: --batch-mode --no-transfer-progress - DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} - -jobs: - single: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT1 \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-auto-basic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-auto-enhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeAutoEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-tree-manual: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTreeManual \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-arch-verification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-regression-consumer: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionRegressionConsumer \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-regression-consumer-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-regression-misc: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionRegressionMisc \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-table-manual-basic: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualBasic \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - dual-table-manual-enhanced: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2DualTableManualEnhanced \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 \ No newline at end of file diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java index 74a34e503be..514c4aa7d50 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java @@ -22,7 +22,6 @@ package org.apache.iotdb.confignode.manager.pipe.agent.plugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor; -import org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionAirGapConnector; import org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionConnector; import org.apache.iotdb.pipe.api.PipeConnector; @@ -42,9 +41,6 @@ class PipeConfigRegionConnectorConstructor extends PipeConnectorConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBConfigRegionConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBConfigRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new); @@ -59,9 +55,6 @@ class PipeConfigRegionConnectorConstructor extends PipeConnectorConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBConfigRegionConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), - IoTDBConfigRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java index c95fec2b4bf..ea358383835 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java @@ -23,10 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBDataRegionAirGapConnector; import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector; -import org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaConnector; -import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector; import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector; @@ -59,15 +56,8 @@ class PipeDataRegionConnectorConstructor extends PipeConnectorConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), IoTDBLegacyPipeConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBDataRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new); pluginConstructors.put( @@ -88,13 +78,8 @@ class PipeDataRegionConnectorConstructor extends PipeConnectorConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), IoTDBLegacyPipeConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), - IoTDBDataRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketConnector::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaConnector::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new); pluginConstructors.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 31cc8250ebf..33143bdc27e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -21,18 +21,10 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor; -import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor; class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { @@ -44,28 +36,6 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), DoNothingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(), - TumblingTimeSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(), - SwingingDoorTrendingSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(), - ChangingValueSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), - ThrowingExceptionProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), AggregateProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(), - StandardStatisticsOperatorProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(), - TumblingWindowingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new); pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(), PipeConsensusProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java index 6c96aaaeddc..fb87b90f877 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.agent.plugin.schemaregion; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor; -import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBSchemaRegionAirGapConnector; import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBSchemaRegionConnector; import org.apache.iotdb.pipe.api.PipeConnector; @@ -42,9 +41,6 @@ class PipeSchemaRegionConnectorConstructor extends PipeConnectorConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBSchemaRegionConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBSchemaRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new); @@ -59,9 +55,6 @@ class PipeSchemaRegionConnectorConstructor extends PipeConnectorConstructor { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBSchemaRegionConnector::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), - IoTDBSchemaRegionAirGapConnector::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index 8396cf2c581..3572d700c92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor.dataregion; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; @@ -145,6 +146,21 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { public void validate(final PipeParameterValidator validator) throws Exception { super.validate(validator); + final boolean forwardingPipeRequests = + validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, + PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), + PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + if (!forwardingPipeRequests) { + throw new PipeParameterNotValidException( + String.format( + "The parameter %s cannot be set to false.", + PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY)); + } + final boolean isTreeDialect = validator .getParameters() @@ -465,6 +481,11 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { return; } + if (pipeName == null || !pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor(); + return; + } + // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index e07385fd813..bb56dd2358b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -111,7 +111,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { private final AtomicReference<Pair<Long, Long>> dataRegionTimePartitionIdBound = new AtomicReference<>(); - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; private boolean shouldTransferModFile; // Whether to transfer mods @@ -241,22 +241,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { ? TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) : TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1; - final boolean isDoubleLiving = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY, - PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY), - PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE); - if (isDoubleLiving) { - isForwardingPipeRequests = false; - } else { - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - } + isForwardingPipeRequests = true; if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { shouldTransferModFile = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 0f2e39f2450..6e163abdb05 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -20,30 +20,19 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.airgap.IoTDBAirGapConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.consensus.PipeConsensusAsyncConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBLegacyPipeConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftAsyncConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSyncConnector; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcda.OpcDaConnector; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcua.OpcUaConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.websocket.WebSocketConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.writeback.WriteBackConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.donothing.DoNothingExtractor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.iotdb.IoTDBExtractor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange.RenameDatabaseProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor; import java.util.Arrays; import java.util.Collections; @@ -61,18 +50,8 @@ public enum BuiltinPipePlugin { // processors DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class), - TUMBLING_TIME_SAMPLING_PROCESSOR( - "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class), - SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class), - CHANGING_VALUE_SAMPLING_PROCESSOR( - "changing-value-sampling-processor", ChangingValueSamplingProcessor.class), - THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class), - AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class), - COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class), // Hidden-processors, which are plugins of the processors - STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), - TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", PipeConsensusProcessor.class), RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class), @@ -83,13 +62,10 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", IoTDBThriftSyncConnector.class), IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeConnector.class), - IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapConnector.class), PIPE_CONSENSUS_ASYNC_CONNECTOR( "pipe-consensus-async-connector", PipeConsensusAsyncConnector.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class), - OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class), - OPC_DA_CONNECTOR("opc-da-connector", OpcDaConnector.class), WRITE_BACK_CONNECTOR("write-back-connector", WriteBackConnector.class), DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class), @@ -98,10 +74,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncConnector.class), IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", IoTDBThriftAsyncConnector.class), IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeConnector.class), - IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class), WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class), - OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class), - OPC_DA_SINK("opc-da-sink", OpcDaConnector.class), WRITE_BACK_SINK("write-back-sink", WriteBackConnector.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingConnector.class), PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncConnector.class), @@ -139,14 +112,6 @@ public enum BuiltinPipePlugin { // Sources DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(), // Processors - TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(), - AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(), - COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(), - STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(), - TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(), RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(), // Connectors @@ -156,10 +121,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(), WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(), WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), // Sinks @@ -167,8 +129,6 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(), WEBSOCKET_SINK.getPipePluginName().toUpperCase(), - OPC_UA_SINK.getPipePluginName().toUpperCase(), - OPC_DA_SINK.getPipePluginName().toUpperCase(), SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase()))); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java index 94653a361ab..587ab5b9fe1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java @@ -61,7 +61,7 @@ public abstract class IoTDBExtractor implements PipeExtractor { protected int regionId; protected PipeTaskMeta pipeTaskMeta; - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; // The value is always true after the first start even the extractor is closed protected final AtomicBoolean hasBeenStarted = new AtomicBoolean(false); @@ -157,22 +157,7 @@ public abstract class IoTDBExtractor implements PipeExtractor { taskID = pipeName + "_" + regionId + "_" + creationTime; pipeTaskMeta = environment.getPipeTaskMeta(); - final boolean isDoubleLiving = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY, - PipeExtractorConstant.SOURCE_MODE_DOUBLE_LIVING_KEY), - PipeExtractorConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE); - if (isDoubleLiving) { - isForwardingPipeRequests = false; - } else { - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - } + isForwardingPipeRequests = true; userName = parameters.getStringByKeys(
