This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch to--207
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/to--207 by this push:
     new a15eaea748d Pipe: Improved the total performance by disable some 
useless logic
a15eaea748d is described below

commit a15eaea748dcb5ae2051fd950ebbb8f02d2e2307
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 15 14:21:01 2025 +0800

    Pipe: Improved the total performance by disable some useless logic
---
 .github/workflows/daily-it.yml                     | 994 +--------------------
 .github/workflows/pipe-it.yml                      | 978 --------------------
 .../manual/basic/IoTDBPipeProtocolIT.java          |  16 +-
 .../treemodel/auto/basic/IoTDBPipeProtocolIT.java  |  16 +-
 .../plugin/PipeConfigRegionSinkConstructor.java    |   6 -
 .../PipeDataRegionProcessorConstructor.java        |  30 -
 .../dataregion/PipeDataRegionSinkConstructor.java  |  12 -
 .../PipeSchemaRegionSinkConstructor.java           |   6 -
 .../source/dataregion/IoTDBDataRegionSource.java   |  49 +-
 .../realtime/PipeRealtimeDataRegionSource.java     |  19 +-
 .../agent/plugin/builtin/BuiltinPipePlugin.java    |  40 -
 .../iotdb/commons/pipe/source/IoTDBSource.java     |  19 +-
 12 files changed, 30 insertions(+), 2155 deletions(-)

diff --git a/.github/workflows/daily-it.yml b/.github/workflows/daily-it.yml
index 5f7d8cdcfd6..08889c8685c 100644
--- a/.github/workflows/daily-it.yml
+++ b/.github/workflows/daily-it.yml
@@ -84,996 +84,4 @@ jobs:
         with:
           name: table-standalone-log-java${{ matrix.java }}-${{ runner.os }}
           path: integration-test/target/cluster-logs
-          retention-days: 3
-  PipeSingle:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        cluster2:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-          ]
-        os: [ubuntu-latest]
-        exclude:
-          - cluster1: LightWeightStandaloneMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: LightWeightStandaloneMode
-            cluster2: ScalableSingleNodeMode
-          - cluster1: ScalableSingleNodeMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: ScalableSingleNodeMode
-            cluster2: HighPerformanceMode
-          - cluster1: HighPerformanceMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: HighPerformanceMode
-            cluster2: HighPerformanceMode
-          - cluster1: PipeConsensusBatchMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: PipeConsensusBatchMode
-            cluster2: HighPerformanceMode
-          - cluster1: PipeConsensusStreamMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: PipeConsensusStreamMode
-            cluster2: HighPerformanceMode
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT1 \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{ 
matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  PipeDualTreeAutoBasic:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster 
}} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTreeAutoBasic \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{ 
runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  PipeDualTreeAutoEnhanced:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        cluster2:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-          ]
-        os: [ubuntu-latest]
-        exclude:
-          - cluster1: LightWeightStandaloneMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: LightWeightStandaloneMode
-            cluster2: ScalableSingleNodeMode
-          - cluster1: ScalableSingleNodeMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: ScalableSingleNodeMode
-            cluster2: HighPerformanceMode
-          - cluster1: HighPerformanceMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: HighPerformanceMode
-            cluster2: HighPerformanceMode
-          - cluster1: PipeConsensusBatchMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: PipeConsensusBatchMode
-            cluster2: HighPerformanceMode
-          - cluster1: PipeConsensusStreamMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: PipeConsensusStreamMode
-            cluster2: HighPerformanceMode
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTreeAutoEnhanced \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{ 
runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  PipeDualTreeManual:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        cluster2:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-          ]
-        os: [ubuntu-latest]
-        exclude:
-          - cluster1: LightWeightStandaloneMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: LightWeightStandaloneMode
-            cluster2: ScalableSingleNodeMode
-          - cluster1: ScalableSingleNodeMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: ScalableSingleNodeMode
-            cluster2: HighPerformanceMode
-          - cluster1: HighPerformanceMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: HighPerformanceMode
-            cluster2: HighPerformanceMode
-          - cluster1: PipeConsensusBatchMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: PipeConsensusBatchMode
-            cluster2: HighPerformanceMode
-          - cluster1: PipeConsensusStreamMode
-            cluster2: LightWeightStandaloneMode
-          - cluster1: PipeConsensusStreamMode
-            cluster2: HighPerformanceMode
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTreeManual \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{ 
runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  SubscriptionTreeArchVerification:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1:
-          [
-            ScalableSingleNodeMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        cluster2: [ScalableSingleNodeMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2SubscriptionTreeArchVerification \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-subscription-tree-arch-verification-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  SubscriptionTableArchVerification:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1: [ScalableSingleNodeMode]
-        cluster2: [ScalableSingleNodeMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2SubscriptionTableArchVerification \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-subscription-table-arch-verification-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  SubscriptionTreeRegressionConsumer:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # do not use HighPerformanceMode here, otherwise some tests will cause 
the GH runner to receive a shutdown signal
-        cluster1:
-          [
-            ScalableSingleNodeMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        cluster2: [ScalableSingleNodeMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2SubscriptionTreeRegressionConsumer \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-subscription-tree-regression-consumer-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  SubscriptionTreeRegressionMisc:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # do not use HighPerformanceMode here, otherwise some tests will cause 
the GH runner to receive a shutdown signal
-        cluster1:
-          [
-            ScalableSingleNodeMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        cluster2: [ScalableSingleNodeMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2SubscriptionTreeRegressionMisc \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-subscription-tree-regression-misc-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  PipeDualTableManualBasic:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster 
}} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTableManualBasic \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{ 
runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  PipeDualTableManualEnhanced:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster:
-          [
-            LightWeightStandaloneMode,
-            ScalableSingleNodeMode,
-            HighPerformanceMode,
-            PipeConsensusBatchMode,
-            PipeConsensusStreamMode,
-          ]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster 
}} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTableManualEnhanced \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java 
}}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
+          retention-days: 3
\ No newline at end of file
diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml
deleted file mode 100644
index b1c2b75fa9f..00000000000
--- a/.github/workflows/pipe-it.yml
+++ /dev/null
@@ -1,978 +0,0 @@
-name: Multi-Cluster IT
-
-on:
-  push:
-    branches:
-      - master
-      - "rel/*"
-      - "rc/*"
-    paths-ignore:
-      - "docs/**"
-      - "site/**"
-      - "iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**" 
#queryengine
-  pull_request:
-    branches:
-      - master
-      - "rel/*"
-      - "rc/*"
-      - "force_ci/**"
-    paths-ignore:
-      - "docs/**"
-      - "site/**"
-      - "iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**" 
#queryengine
-  # allow manually run the action:
-  workflow_dispatch:
-
-concurrency:
-  group: ${{ github.workflow }}-${{ github.ref }}
-  cancel-in-progress: true
-
-env:
-  MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false 
-Dmaven.wagon.http.retryHandler.class=standard 
-Dmaven.wagon.http.retryHandler.count=3
-  MAVEN_ARGS: --batch-mode --no-transfer-progress
-  DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
-
-jobs:
-  single:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1: [HighPerformanceMode]
-        cluster2: [HighPerformanceMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT1 \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-single-java${{ matrix.java }}-${{ runner.os }}-${{ 
matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  dual-tree-auto-basic:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster: [HighPerformanceMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster 
}} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTreeAutoBasic \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-tree-auto-basic-java${{ matrix.java }}-${{ 
runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  dual-tree-auto-enhanced:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1: [HighPerformanceMode]
-        cluster2: [HighPerformanceMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTreeAutoEnhanced \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-tree-auto-enhanced-java${{ matrix.java }}-${{ 
runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  dual-tree-manual:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1: [HighPerformanceMode]
-        cluster2: [HighPerformanceMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTreeManual \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-tree-manual-java${{ matrix.java }}-${{ 
runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  subscription-tree-arch-verification:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1: [ScalableSingleNodeMode]
-        cluster2: [ScalableSingleNodeMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2SubscriptionTreeArchVerification \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-subscription-tree-arch-verification-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  subscription-table-arch-verification:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster1: [ScalableSingleNodeMode]
-        cluster2: [ScalableSingleNodeMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2SubscriptionTableArchVerification \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-subscription-table-arch-verification-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  subscription-tree-regression-consumer:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # do not use HighPerformanceMode here, otherwise some tests will cause 
the GH runner to receive a shutdown signal
-        cluster1: [ScalableSingleNodeMode]
-        cluster2: [ScalableSingleNodeMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2SubscriptionTreeRegressionConsumer \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-subscription-tree-regression-consumer-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  subscription-tree-regression-misc:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # do not use HighPerformanceMode here, otherwise some tests will cause 
the GH runner to receive a shutdown signal
-        cluster1: [ScalableSingleNodeMode]
-        cluster2: [ScalableSingleNodeMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT2SubscriptionTreeRegressionMisc \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-subscription-tree-regression-misc-java${{ 
matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  dual-table-manual-basic:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster: [HighPerformanceMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster 
}} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTableManualBasic \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-table-manual-basic-java${{ matrix.java }}-${{ 
runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  dual-table-manual-enhanced:
-    strategy:
-      fail-fast: false
-      max-parallel: 15
-      matrix:
-        java: [17]
-        # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
-        cluster: [HighPerformanceMode]
-        os: [ubuntu-latest]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster 
}} \
-              -pl integration-test \
-              -am -PMultiClusterIT2DualTableManualEnhanced \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-dual-table-manual-enhanced-java${{ matrix.java 
}}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
-  triple:
-    strategy:
-      fail-fast: false
-      max-parallel: 1
-      matrix:
-        java: [ 17 ]
-        cluster1: [ ScalableSingleNodeMode ]
-        cluster2: [ ScalableSingleNodeMode ]
-        cluster3: [ ScalableSingleNodeMode ]
-        os: [ ubuntu-latest ]
-    runs-on: ${{ matrix.os }}
-    steps:
-      - uses: actions/checkout@v4
-      - name: Set up JDK ${{ matrix.java }}
-        uses: actions/setup-java@v4
-        with:
-          distribution: corretto
-          java-version: ${{ matrix.java }}
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-      - name: Cache Maven packages
-        uses: actions/cache@v4
-        with:
-          path: ~/.m2
-          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
-          restore-keys: ${{ runner.os }}-m2-
-      - name: Sleep for a random duration between 0 and 10000 milliseconds
-        run: |
-          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
-      - name: IT Test
-        shell: bash
-        # we do not compile client-cpp for saving time, it is tested in 
client.yml
-        # we can skip influxdb-protocol because it has been tested separately 
in influxdb-protocol.yml
-        run: |
-          retry() {
-            local -i max_attempts=3
-            local -i attempt=1
-            local -i retry_sleep=5
-            local test_output
-
-            while [ $attempt -le $max_attempts ]; do
-              mvn clean verify \
-              -P with-integration-tests \
-              -DskipUTs \
-              -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-              -DClusterConfigurations=${{ matrix.cluster1 }},${{ 
matrix.cluster2 }},${{ matrix.cluster3 }} \
-              -pl integration-test \
-              -am -PMultiClusterIT3 \
-              -ntp >> ~/run-tests-$attempt.log && return 0
-              test_output=$(cat ~/run-tests-$attempt.log) 
-
-              echo "==================== BEGIN: ~/run-tests-$attempt.log 
===================="          
-              echo "$test_output"
-              echo "==================== END: ~/run-tests-$attempt.log 
======================"
-
-              if ! mv ~/run-tests-$attempt.log 
integration-test/target/cluster-logs/ 2>/dev/null; then
-                echo "Failed to move log file ~/run-tests-$attempt.log to 
integration-test/target/cluster-logs/. Skipping..."
-              fi
-
-              if echo "$test_output" | grep -q "Could not transfer artifact"; 
then
-                if [ $attempt -lt $max_attempts ]; then
-                  echo "Test failed with artifact transfer issue, attempt 
$attempt. Retrying in $retry_sleep seconds..."
-                  sleep $retry_sleep
-                  attempt=$((attempt + 1))
-                else
-                  echo "Test failed after $max_attempts attempts due to 
artifact transfer issue."
-                  echo "Treating this as a success because the issue is likely 
transient."
-                  return 0
-                fi
-              elif [ $? -ne 0 ]; then
-                echo "Test failed with a different error."
-                return 1
-              else
-                echo "Tests passed"
-                return 0
-              fi
-            done
-          }
-          retry
-      - name: Upload Artifact
-        if: failure()
-        uses: actions/upload-artifact@v4
-        with:
-          name: cluster-log-triple-java${{ matrix.java }}-${{ runner.os }}-${{ 
matrix.cluster1 }}-${{ matrix.cluster2 }}-${{ matrix.cluster3 }}
-          path: integration-test/target/cluster-logs
-          retention-days: 30
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
index 3fce11bf51f..8e8c61dca24 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
@@ -351,11 +351,6 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
     
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
   }
 
-  @Test
-  public void testAirGapConnectorUseNodeUrls() throws Exception {
-    
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
-  }
-
   private void doTestUseNodeUrls(String connectorName) throws Exception {
     senderEnv
         .getConfig()
@@ -398,16 +393,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         };
 
     for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) 
{
-      if 
(connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()))
 {
-        // Use default port for convenience
-        nodeUrlsBuilder
-            .append(wrapper.getIp())
-            .append(":")
-            .append(wrapper.getPipeAirGapReceiverPort())
-            .append(",");
-      } else {
-        nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
-      }
+      nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
     }
 
     try (final SyncConfigNodeIServiceClient client =
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
index 0d6091c51ec..6c1efc88515 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
@@ -339,11 +339,6 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
     
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
   }
 
-  @Test
-  public void testAirGapConnectorUseNodeUrls() throws Exception {
-    
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
-  }
-
   private void doTestUseNodeUrls(String connectorName) throws Exception {
     senderEnv
         .getConfig()
@@ -378,16 +373,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
 
     final StringBuilder nodeUrlsBuilder = new StringBuilder();
     for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) 
{
-      if 
(connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()))
 {
-        // Use default port for convenience
-        nodeUrlsBuilder
-            .append(wrapper.getIp())
-            .append(":")
-            .append(wrapper.getPipeAirGapReceiverPort())
-            .append(",");
-      } else {
-        nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
-      }
+      nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
     }
 
     try (final SyncConfigNodeIServiceClient client =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java
index c7222e9a1bc..82c394c7fb5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.confignode.manager.pipe.agent.plugin;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
-import 
org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionAirGapSink;
 import 
org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionSink;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -41,9 +40,6 @@ class PipeConfigRegionSinkConstructor extends 
PipeSinkConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
         IoTDBConfigRegionSink::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
-        IoTDBConfigRegionAirGapSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), 
DoNothingSink::new);
 
@@ -55,8 +51,6 @@ class PipeConfigRegionSinkConstructor extends 
PipeSinkConstructor {
         BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), 
IoTDBConfigRegionSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), 
IoTDBConfigRegionSink::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), 
IoTDBConfigRegionAirGapSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), 
DoNothingSink::new);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 31cc8250ebf..33143bdc27e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -21,18 +21,10 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
-import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor;
-import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor;
-import 
org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
-import 
org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor;
-import 
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
-import 
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
 import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor;
-import 
org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
 
 class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
 
@@ -44,28 +36,6 @@ class PipeDataRegionProcessorConstructor extends 
PipeProcessorConstructor {
   protected void initConstructors() {
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), 
DoNothingProcessor::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(),
-        TumblingTimeSamplingProcessor::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
-        SwingingDoorTrendingSamplingProcessor::new);
-    pluginConstructors.put(
-        
BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(),
-        ChangingValueSamplingProcessor::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
-        ThrowingExceptionProcessor::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), 
AggregateProcessor::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(),
-        StandardStatisticsOperatorProcessor::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
-        TumblingWindowingProcessor::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), 
TwoStageCountProcessor::new);
     pluginConstructors.put(
         BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
         PipeConsensusProcessor::new);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
index 536cf71cdb8..09773d0cad5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java
@@ -23,10 +23,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
-import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink;
 import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
-import org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaSink;
-import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
@@ -59,13 +56,8 @@ class PipeDataRegionSinkConstructor extends 
PipeSinkConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
         IoTDBLegacyPipeSink::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
-        IoTDBDataRegionAirGapSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), 
WebSocketSink::new);
-    
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), 
OpcUaSink::new);
-    
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), 
OpcDaSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), 
DoNothingSink::new);
     pluginConstructors.put(
@@ -82,12 +74,8 @@ class PipeDataRegionSinkConstructor extends 
PipeSinkConstructor {
         IoTDBDataRegionAsyncSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), 
IoTDBLegacyPipeSink::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), 
IoTDBDataRegionAirGapSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), 
WebSocketSink::new);
-    pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), 
OpcUaSink::new);
-    pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), 
OpcDaSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), 
DoNothingSink::new);
     pluginConstructors.put(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java
index 160ecf54c01..a7752994ee2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.agent.plugin.schemaregion;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
-import 
org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -41,9 +40,6 @@ class PipeSchemaRegionSinkConstructor extends 
PipeSinkConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
         IoTDBSchemaRegionSink::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
-        IoTDBSchemaRegionAirGapSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), 
DoNothingSink::new);
 
@@ -55,8 +51,6 @@ class PipeSchemaRegionSinkConstructor extends 
PipeSinkConstructor {
         BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), 
IoTDBSchemaRegionSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), 
IoTDBSchemaRegionSink::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), 
IoTDBSchemaRegionAirGapSink::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), 
DoNothingSink::new);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 9d2b629a67d..c862d7c2244 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.source.dataregion;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
@@ -141,6 +142,21 @@ public class IoTDBDataRegionSource extends IoTDBSource {
   public void validate(final PipeParameterValidator validator) throws 
Exception {
     super.validate(validator);
 
+    final boolean forwardingPipeRequests =
+        validator
+            .getParameters()
+            .getBooleanOrDefault(
+                Arrays.asList(
+                    PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+                    PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
+                
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+    if (!forwardingPipeRequests) {
+      throw new PipeParameterNotValidException(
+          String.format(
+              "The parameter %s cannot be set to false.",
+              PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY));
+    }
+
     final boolean isTreeDialect =
         validator
             .getParameters()
@@ -262,32 +278,6 @@ public class IoTDBDataRegionSource extends IoTDBSource {
                     Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
                     EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
 
-    // Validate extractor.realtime.mode
-    if (validator
-            .getParameters()
-            .getBooleanOrDefault(
-                Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
-                EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
-        || validator
-            .getParameters()
-            .hasAnyAttributes(
-                SOURCE_START_TIME_KEY,
-                EXTRACTOR_START_TIME_KEY,
-                SOURCE_END_TIME_KEY,
-                EXTRACTOR_END_TIME_KEY)) {
-      validator.validateAttributeValueRange(
-          validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
-              ? EXTRACTOR_REALTIME_MODE_KEY
-              : SOURCE_REALTIME_MODE_KEY,
-          true,
-          EXTRACTOR_REALTIME_MODE_FILE_VALUE,
-          EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
-          EXTRACTOR_REALTIME_MODE_LOG_VALUE,
-          EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
-          EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
-          EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
-    }
-
     checkInvalidParameters(validator);
 
     constructHistoricalExtractor();
@@ -450,6 +440,13 @@ public class IoTDBDataRegionSource extends IoTDBSource {
       return;
     }
 
+    if (!(pipeName != null
+        && (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX)
+            || pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) {
+      realtimeExtractor = new PipeRealtimeDataRegionTsFileSource();
+      return;
+    }
+
     // Use hybrid mode by default
     if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, 
SOURCE_MODE_STREAMING_KEY)
         && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 08cc91b232d..665bb299d2f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -115,7 +115,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   private final AtomicReference<Pair<Long, Long>> 
dataRegionTimePartitionIdBound =
       new AtomicReference<>();
 
-  protected boolean isForwardingPipeRequests;
+  protected boolean isForwardingPipeRequests = true;
 
   private boolean shouldTransferModFile; // Whether to transfer mods
 
@@ -251,22 +251,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
             ? 
TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime)
             : 
TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1;
 
-    final boolean isDoubleLiving =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(
-                PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
-                PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
-            PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
-    if (isDoubleLiving) {
-      isForwardingPipeRequests = false;
-    } else {
-      isForwardingPipeRequests =
-          parameters.getBooleanOrDefault(
-              Arrays.asList(
-                  PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
-                  PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-              
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
-    }
+    isForwardingPipeRequests = true;
 
     if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
       shouldTransferModFile =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 74bc0d9815a..f9f9c0fb6e3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -19,27 +19,16 @@
 
 package org.apache.iotdb.commons.pipe.agent.plugin.builtin;
 
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange.RenameDatabaseProcessor;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.airgap.IoTDBAirGapSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.consensus.PipeConsensusAsyncSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBLegacyPipeSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftAsyncSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSslSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSyncSink;
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcda.OpcDaSink;
-import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcua.OpcUaSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.websocket.WebSocketSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.writeback.WriteBackSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource;
@@ -61,18 +50,8 @@ public enum BuiltinPipePlugin {
 
   // processors
   DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
-  TUMBLING_TIME_SAMPLING_PROCESSOR(
-      "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
-  SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", 
SwingingDoorTrendingSamplingProcessor.class),
-  CHANGING_VALUE_SAMPLING_PROCESSOR(
-      "changing-value-sampling-processor", 
ChangingValueSamplingProcessor.class),
-  THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", 
ThrowingExceptionProcessor.class),
-  AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class),
-  COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class),
 
   // Hidden-processors, which are plugins of the processors
-  STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", 
StandardStatisticsProcessor.class),
-  TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", 
TumblingWindowingProcessor.class),
   PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", 
PipeConsensusProcessor.class),
   RENAME_DATABASE_PROCESSOR("rename-database-processor", 
RenameDatabaseProcessor.class),
 
@@ -83,12 +62,9 @@ public enum BuiltinPipePlugin {
   IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", 
IoTDBThriftSyncSink.class),
   IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", 
IoTDBThriftAsyncSink.class),
   IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", 
IoTDBLegacyPipeSink.class),
-  IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class),
   PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", 
PipeConsensusAsyncSink.class),
 
   WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class),
-  OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class),
-  OPC_DA_CONNECTOR("opc-da-connector", OpcDaSink.class),
   WRITE_BACK_CONNECTOR("write-back-connector", WriteBackSink.class),
 
   DO_NOTHING_SINK("do-nothing-sink", DoNothingSink.class),
@@ -97,10 +73,7 @@ public enum BuiltinPipePlugin {
   IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncSink.class),
   IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", 
IoTDBThriftAsyncSink.class),
   IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeSink.class),
-  IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapSink.class),
   WEBSOCKET_SINK("websocket-sink", WebSocketSink.class),
-  OPC_UA_SINK("opc-ua-sink", OpcUaSink.class),
-  OPC_DA_SINK("opc-da-sink", OpcDaSink.class),
   WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
   SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
   PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", 
PipeConsensusAsyncSink.class),
@@ -148,14 +121,6 @@ public enum BuiltinPipePlugin {
                   // Sources
                   DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(),
                   // Processors
-                  
TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
-                  SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
-                  
CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
-                  
THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(),
-                  AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(),
-                  COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(),
-                  
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
-                  
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
                   PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
                   RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
                   // Connectors
@@ -165,10 +130,7 @@ public enum BuiltinPipePlugin {
                   
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
-                  IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
                   WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
-                  OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
-                  OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
                   WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
                   
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   // Sinks
@@ -176,8 +138,6 @@ public enum BuiltinPipePlugin {
                   IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
                   IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
                   WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
-                  OPC_UA_SINK.getPipePluginName().toUpperCase(),
-                  OPC_DA_SINK.getPipePluginName().toUpperCase(),
                   SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
                   
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
index 6640fde7f4a..d98f7572924 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
@@ -62,7 +62,7 @@ public abstract class IoTDBSource implements PipeExtractor {
   protected int regionId;
   protected PipeTaskMeta pipeTaskMeta;
 
-  protected boolean isForwardingPipeRequests;
+  protected boolean isForwardingPipeRequests = true;
 
   // The value is always true after the first start even the extractor is 
closed
   protected final AtomicBoolean hasBeenStarted = new AtomicBoolean(false);
@@ -161,22 +161,7 @@ public abstract class IoTDBSource implements PipeExtractor 
{
     taskID = pipeName + "_" + regionId + "_" + creationTime;
     pipeTaskMeta = environment.getPipeTaskMeta();
 
-    final boolean isDoubleLiving =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(
-                PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
-                PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
-            PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
-    if (isDoubleLiving) {
-      isForwardingPipeRequests = false;
-    } else {
-      isForwardingPipeRequests =
-          parameters.getBooleanOrDefault(
-              Arrays.asList(
-                  PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
-                  PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-              
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
-    }
+    isForwardingPipeRequests = true;
 
     userId =
         parameters.getStringOrDefault(

Reply via email to