This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ed159080ba [INLONG-8233][Sort] Support running on both flink1.13 and 
flink1.15  (#8322)
ed159080ba is described below

commit ed159080ba5eb5f370ddad15dc0b4af2ed5fb796
Author: haibo.duan <[email protected]>
AuthorDate: Thu Jun 29 13:12:03 2023 +0800

    [INLONG-8233][Sort] Support running on both flink1.13 and flink1.15  (#8322)
    
    Co-authored-by: Charles Zhang <[email protected]>
---
 .github/workflows/ci_ut_flink15.yml                | 80 ++++++++++++++++++++++
 inlong-sort/pom.xml                                | 38 ++++++++++
 inlong-sort/sort-core/pom.xml                      |  2 +-
 .../sort/function/CascadeFunctionWrapperTest.java  |  1 -
 .../inlong/sort/function/EncryptFunctionTest.java  |  1 -
 .../sort/function/JsonGetterFunctionTest.java      |  1 -
 .../function/RegexpReplaceFirstFunctionTest.java   |  1 -
 .../sort/function/RegexpReplaceFunctionTest.java   |  1 -
 .../sort/function/SplitIndexFunctionTest.java      |  1 -
 .../inlong/sort/parser/AllMigrateMongoDBTest.java  |  1 -
 .../inlong/sort/parser/AllMigrateOracleTest.java   |  2 -
 .../sort/parser/AllMigratePostgreSQLTest.java      |  1 -
 .../apache/inlong/sort/parser/AllMigrateTest.java  |  3 -
 .../parser/AllMigrateWithSpecifyingFieldTest.java  |  1 -
 .../sort/parser/ClickHouseSqlParserTest.java       |  1 -
 .../sort/parser/CustomFunctionSqlParseTest.java    |  1 -
 .../sort/parser/DataTypeConvertSqlParseTest.java   |  2 -
 .../sort/parser/DecimalFormatSqlParseTest.java     |  1 -
 .../sort/parser/DistinctNodeSqlParseTest.java      |  3 -
 .../DorisExtractNodeToDorisLoadNodeTest.java       |  1 -
 .../DorisExtractNodeToMySqlLoadNodeTest.java       |  1 -
 .../inlong/sort/parser/DorisMultipleSinkTest.java  |  1 -
 .../inlong/sort/parser/ESMultipleSinkTest.java     |  1 -
 .../sort/parser/ElasticsearchSqlParseTest.java     |  1 -
 .../sort/parser/FilesystemSqlParserTest.java       |  1 -
 .../apache/inlong/sort/parser/FilterParseTest.java |  2 -
 .../inlong/sort/parser/FlinkSqlParserTest.java     |  3 -
 .../sort/parser/FullOuterJoinSqlParseTest.java     |  2 -
 .../sort/parser/GreenplumLoadSqlParseTest.java     |  1 -
 .../sort/parser/HbaseLoadFlinkSqlParseTest.java    |  1 -
 .../inlong/sort/parser/HudiNodeSqlParserTest.java  |  1 -
 .../sort/parser/IcebergNodeSqlParserTest.java      |  1 -
 .../sort/parser/InnerJoinRelationSqlParseTest.java |  3 -
 .../parser/IntervalJoinRelationSqlParseTest.java   |  1 -
 .../inlong/sort/parser/KafkaLoadSqlParseTest.java  |  6 --
 .../inlong/sort/parser/KafkaSqlParseTest.java      |  2 -
 .../sort/parser/LeftOuterJoinSqlParseTest.java     |  2 -
 .../inlong/sort/parser/MetaFieldSyncTest.java      |  1 -
 .../sort/parser/MongoExtractFlinkSqlParseTest.java |  2 -
 .../MySqlExtractNodeToDorisLoadNodeTest.java       |  1 -
 .../inlong/sort/parser/MySqlLoadSqlParseTest.java  |  4 --
 .../MySqlTemporalJoinRelationSqlParseTest.java     |  4 --
 .../sort/parser/NativeFlinkSqlParserTest.java      |  1 -
 .../sort/parser/OracleExtractSqlParseTest.java     |  1 -
 .../inlong/sort/parser/OracleLoadSqlParseTest.java |  1 -
 .../parser/PostgresExtractFlinkSqlParseTest.java   |  1 -
 .../parser/PostgresLoadNodeFlinkSqlParseTest.java  |  1 -
 .../inlong/sort/parser/PulsarSqlParserTest.java    |  1 -
 .../inlong/sort/parser/RedisNodeSqlParserTest.java |  1 -
 .../RedisTemporalJoinRelationSqlParseTest.java     |  2 -
 .../sort/parser/RightOuterJoinSqlParseTest.java    |  2 -
 .../sort/parser/SqlServerNodeSqlParseTest.java     |  2 -
 .../TDSQLPostgresLoadNodeFlinkSqlParseTest.java    |  1 -
 .../inlong/sort/parser/TubeMQNodeSqlParseTest.java |  1 -
 .../inlong/sort/parser/UnionSqlParseTest.java      |  2 -
 inlong-sort/sort-end-to-end-tests/pom.xml          |  4 +-
 .../sort/tests/utils/FlinkContainerTestEnv.java    |  2 +-
 inlong-sort/sort-flink/sort-flink-v1.13/pom.xml    |  1 +
 58 files changed, 123 insertions(+), 86 deletions(-)

diff --git a/.github/workflows/ci_ut_flink15.yml 
b/.github/workflows/ci_ut_flink15.yml
new file mode 100644
index 0000000000..e0d4fc9881
--- /dev/null
+++ b/.github/workflows/ci_ut_flink15.yml
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: InLong Unit Test For Flink 1.15
+
+on:
+  push:
+    paths:
+      - '.github/workflows/ci_ut_flink15.yml'
+      - 'inlong-sort/**'
+      - '!**.md'
+
+  pull_request:
+    paths:
+      - '.github/workflows/ci_ut_flink15.yml'
+      - 'inlong-sort/**'
+      - '!**.md'
+
+jobs:
+  unit-test:
+    name: Unit Test
+    runs-on: ubuntu-22.04
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v3
+
+      - name: Set up JDK
+        uses: actions/setup-java@v3
+        with:
+          java-version: 8
+          distribution: adopt
+
+      - name: Cache Maven packages
+        uses: actions/cache@v3
+        with:
+          path: |
+            ~/.m2/repository
+            !~/.m2/repository/org/apache/inlong
+          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+          restore-keys: ${{ runner.os }}-m2
+
+      - name: Build for Flink 1.15 with Maven
+        run: mvn --update-snapshots -e -V package -U -pl inlong-sort/sort-core 
-amd -Pv1.15 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 
-Daether.connector.http.reuseConnections=false 
-Daether.connector.requestTimeout=60000
+        env:
+          CI: false
+
+      - name: Unit test for Flink 1.15 with Maven
+        run: mvn --update-snapshots -e -V test -Dtest="*" -pl 
inlong-sort/sort-core
+        env:
+          CI: false
+
+      - name: Upload unit test results
+        if: ${{ failure() }}
+        uses: actions/upload-artifact@v3
+        with:
+          name: surefire-reports
+          path: ./**/target/surefire-reports/
+          if-no-files-found: ignore
+
+      - name: Upload integration test results
+        if: ${{ failure() }}
+        uses: actions/upload-artifact@v3
+        with:
+          name: failsafe-reports
+          path: ./**/target/failsafe-reports/
+          if-no-files-found: ignore
\ No newline at end of file
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 0dd449f894..be848ce111 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -161,4 +161,42 @@
             </plugin>
         </plugins>
     </build>
+
+    <profiles>
+        <profile>
+            <id>v1.13</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <properties>
+                <sort.flink.version>v1.13</sort.flink.version>
+                <flink.version>1.13.5</flink.version>
+                <flink.minor.version>1.13</flink.minor.version>
+                <flink.scala.binary.version>2.11</flink.scala.binary.version>
+                <flink.jackson.version>2.12.1-13.0</flink.jackson.version>
+                <flink.protobuf.version>2.7.6</flink.protobuf.version>
+                <flink.cdc.base.version>2.3.0</flink.cdc.base.version>
+                
<flink.streaming.artifactId>flink-streaming-java_${flink.scala.binary.version}</flink.streaming.artifactId>
+                
<flink.test.utils.artifactId>flink-test-utils_${flink.scala.binary.version}</flink.test.utils.artifactId>
+                
<flink.runtime.artifactId>flink-runtime_${scala.binary.version}</flink.runtime.artifactId>
+                
<flink.docker.image.name>flink:1.13.5-scala_2.11</flink.docker.image.name>
+            </properties>
+        </profile>
+        <profile>
+            <id>v1.15</id>
+            <properties>
+                <sort.flink.version>v1.15</sort.flink.version>
+                <flink.version>1.15.4</flink.version>
+                <flink.minor.version>1.15</flink.minor.version>
+                <flink.scala.binary.version>2.12</flink.scala.binary.version>
+                <flink.jackson.version>2.12.1-13.0</flink.jackson.version>
+                <flink.protobuf.version>2.7.6</flink.protobuf.version>
+                <flink.cdc.base.version>2.3.0</flink.cdc.base.version>
+                
<flink.streaming.artifactId>flink-streaming-java</flink.streaming.artifactId>
+                
<flink.test.utils.artifactId>flink-test-utils</flink.test.utils.artifactId>
+                
<flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
+                
<flink.docker.image.name>flink:1.15.4-scala_2.12</flink.docker.image.name>
+            </properties>
+        </profile>
+    </profiles>
 </project>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index b3239e3635..25935ec3b9 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -70,7 +70,7 @@
         <!-- for test -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
+            <artifactId>${flink.test.utils.artifactId}</artifactId>
             <version>${flink.version}</version>
             <scope>test</scope>
             <exclusions>
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/CascadeFunctionWrapperTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/CascadeFunctionWrapperTest.java
index e5a9ae75de..f8eef41c1f 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/CascadeFunctionWrapperTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/CascadeFunctionWrapperTest.java
@@ -56,7 +56,6 @@ public class CascadeFunctionWrapperTest extends 
AbstractTestBase {
         // step 0. Initialize the execution environment
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/EncryptFunctionTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/EncryptFunctionTest.java
index 1f63247e59..bdf09b601f 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/EncryptFunctionTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/EncryptFunctionTest.java
@@ -53,7 +53,6 @@ public class EncryptFunctionTest extends AbstractTestBase {
     public void testEncryptFunction() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
index 53aab54dde..be7c98dfa1 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
@@ -52,7 +52,6 @@ public class JsonGetterFunctionTest extends AbstractTestBase {
     public void testJsonGetterFunction() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunctionTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunctionTest.java
index 0ae32ed30f..4ebddc464b 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunctionTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunctionTest.java
@@ -54,7 +54,6 @@ public class RegexpReplaceFirstFunctionTest extends 
AbstractTestBase {
         // step 0. Initialize the execution environment
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
index 3cfe8e9550..ea25034b53 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
@@ -53,7 +53,6 @@ public class RegexpReplaceFunctionTest extends 
AbstractTestBase {
         // step 0. Initialize the execution environment
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/SplitIndexFunctionTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/SplitIndexFunctionTest.java
index 715c97adb2..13701f707b 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/SplitIndexFunctionTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/SplitIndexFunctionTest.java
@@ -54,7 +54,6 @@ public class SplitIndexFunctionTest extends AbstractTestBase {
     public void testSplitIndexFunction() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateMongoDBTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateMongoDBTest.java
index 3855e08531..fb3f8fd61f 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateMongoDBTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateMongoDBTest.java
@@ -86,7 +86,6 @@ public class AllMigrateMongoDBTest extends AbstractTestBase {
     public void testAllMigrate() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java
index 2d041128f4..c44e35f561 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java
@@ -120,7 +120,6 @@ public class AllMigrateOracleTest extends AbstractTestBase {
     public void testAllMigrate() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -147,7 +146,6 @@ public class AllMigrateOracleTest extends AbstractTestBase {
     public void testAllMigrateWithBytesFormat() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java
index 995d881377..607cbf9d68 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java
@@ -105,7 +105,6 @@ public class AllMigratePostgreSQLTest extends 
AbstractTestBase {
     public void testAllMigrate() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 334cb52ac2..1f75ca89d4 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -144,7 +144,6 @@ public class AllMigrateTest extends AbstractTestBase {
     public void testAllMigrate() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -171,7 +170,6 @@ public class AllMigrateTest extends AbstractTestBase {
     public void testAllMigrateMultiRelations() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -201,7 +199,6 @@ public class AllMigrateTest extends AbstractTestBase {
     public void testAllMigrateWithBytesFormat() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
index 21c854a9a3..1f5f1007d0 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
@@ -108,7 +108,6 @@ public class AllMigrateWithSpecifyingFieldTest extends 
AbstractTestBase {
     public void testAllMigrateMySQLToMySQL() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
index 590a9a11a8..c1df2b7a21 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
@@ -108,7 +108,6 @@ public class ClickHouseSqlParserTest extends 
AbstractTestBase {
     public void testClickHouse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
index 9ad649fb66..d765117a07 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
@@ -113,7 +113,6 @@ public class CustomFunctionSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
index 3c4f2c5b11..e0e7558ee2 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
@@ -117,7 +117,6 @@ public class DataTypeConvertSqlParseTest extends 
AbstractTestBase {
     public void testDataTypeConvertSqlParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -167,7 +166,6 @@ public class DataTypeConvertSqlParseTest extends 
AbstractTestBase {
     public void testHBaseDataTypeConvertSqlParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java
index 618c75506f..7a19568659 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java
@@ -87,7 +87,6 @@ public class DecimalFormatSqlParseTest extends 
AbstractTestBase {
     public void testDecimalFormatSqlParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
index 958a811fa3..a1e7c65d54 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
@@ -244,7 +244,6 @@ public class DistinctNodeSqlParseTest extends 
AbstractTestBase {
     public void testDistinctBasedProcessTime() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -275,7 +274,6 @@ public class DistinctNodeSqlParseTest extends 
AbstractTestBase {
     public void testDistinctBasedTimeField() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -306,7 +304,6 @@ public class DistinctNodeSqlParseTest extends 
AbstractTestBase {
     public void testDistinctBasedEventTime() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToDorisLoadNodeTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToDorisLoadNodeTest.java
index 95e3cdf9a0..2661ae5dba 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToDorisLoadNodeTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToDorisLoadNodeTest.java
@@ -128,7 +128,6 @@ public class DorisExtractNodeToDorisLoadNodeTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
index 8e0f445e85..e07fd49d9d 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
@@ -128,7 +128,6 @@ public class DorisExtractNodeToMySqlLoadNodeTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisMultipleSinkTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisMultipleSinkTest.java
index 50e4b6d6d0..54f20de477 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisMultipleSinkTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisMultipleSinkTest.java
@@ -106,7 +106,6 @@ public class DorisMultipleSinkTest extends AbstractTestBase 
{
     public void testDorisMultipleSinkParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ESMultipleSinkTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ESMultipleSinkTest.java
index b73d1d5c43..80186cee28 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ESMultipleSinkTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ESMultipleSinkTest.java
@@ -106,7 +106,6 @@ public class ESMultipleSinkTest extends AbstractTestBase {
     public void testESMultipleSinkParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
index f4443d6ae3..29a3d1fd51 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
@@ -103,7 +103,6 @@ public abstract class ElasticsearchSqlParseTest extends 
AbstractTestBase {
     public void testMysqlToElasticsearch(Node node) throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
index 6a8f4fc700..51478cf704 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
@@ -112,7 +112,6 @@ public class FilesystemSqlParserTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
index d2dfcc059c..e76a829cb5 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
@@ -114,7 +114,6 @@ public class FilterParseTest extends AbstractTestBase {
     public void testFilterWithRetainParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -143,7 +142,6 @@ public class FilterParseTest extends AbstractTestBase {
     public void testFilterWithRemoveParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
index 8b7c29f5a1..6db698414d 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
@@ -184,7 +184,6 @@ public class FlinkSqlParserTest extends AbstractTestBase {
     public void testMysqlToHive() {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -222,7 +221,6 @@ public class FlinkSqlParserTest extends AbstractTestBase {
     public void testToFileSystem() {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -249,7 +247,6 @@ public class FlinkSqlParserTest extends AbstractTestBase {
     public void testFlinkSqlParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
index fd2e156530..cad150822b 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
@@ -243,7 +243,6 @@ public class FullOuterJoinSqlParseTest extends 
AbstractTestBase {
     public void testFullOuterJoin() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -280,7 +279,6 @@ public class FullOuterJoinSqlParseTest extends 
AbstractTestBase {
     public void testFullOuterJoinWithDistinctAndFilter() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
index 6b8a9cd9ea..bdf72278af 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
@@ -104,7 +104,6 @@ public class GreenplumLoadSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
index 489b839688..291f89f22e 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
@@ -117,7 +117,6 @@ public class HbaseLoadFlinkSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
index 0fb1fece29..0b7ff83296 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -166,7 +166,6 @@ public class HudiNodeSqlParserTest extends AbstractTestBase 
{
     public void testHudi() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
index 1bd06e2f60..41d5701fdf 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
@@ -159,7 +159,6 @@ public class IcebergNodeSqlParserTest extends 
AbstractTestBase {
     public void testIceberg() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
index dd5ecce42c..bde57ec2cc 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
@@ -271,7 +271,6 @@ public class InnerJoinRelationSqlParseTest extends 
AbstractTestBase {
     public void testInnerJoin() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -307,7 +306,6 @@ public class InnerJoinRelationSqlParseTest extends 
AbstractTestBase {
     public void testInnerJoinWithUpsertKafka() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -344,7 +342,6 @@ public class InnerJoinRelationSqlParseTest extends 
AbstractTestBase {
     public void testInnerJoinWithDistinctAndFilter() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
index 140bd28218..3e7c51ff53 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
@@ -159,7 +159,6 @@ public class IntervalJoinRelationSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
index db3ec360db..957e890e68 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
@@ -225,7 +225,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase 
{
     public void testKafkaSourceSqlParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -253,7 +252,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase 
{
     public void testKafkaDynamicTopicParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -281,7 +279,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase 
{
     public void testKafkaDynamicPartitionParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -309,7 +306,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase 
{
     public void testKafkaDynamicPartitionWithPrimaryKey() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -337,7 +333,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase 
{
     public void testKafkaDirtyHandleSqlParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -365,7 +360,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase 
{
     public void testKafkaDirtyHandleWithDynamicTopicSqlParse() throws 
Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
index 9ece69abe6..79991404af 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
@@ -64,7 +64,6 @@ public class KafkaSqlParseTest extends AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
@@ -150,7 +149,6 @@ public class KafkaSqlParseTest extends AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
index c31bb23804..547535a136 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
@@ -244,7 +244,6 @@ public class LeftOuterJoinSqlParseTest extends 
AbstractTestBase {
     public void testLeftOuterJoin() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -281,7 +280,6 @@ public class LeftOuterJoinSqlParseTest extends 
AbstractTestBase {
     public void testLeftOuterJoinWithDistinctAndFilter() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
index 3d25e9b58d..d7e4b75106 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
@@ -225,7 +225,6 @@ public class MetaFieldSyncTest extends AbstractTestBase {
     public void testMetaFieldSyncTest() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
index 8161bc1183..d2bb5645de 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
@@ -108,7 +108,6 @@ public class MongoExtractFlinkSqlParseTest extends 
AbstractTestBase {
     public void testMongoDbToKafka() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -178,7 +177,6 @@ public class MongoExtractFlinkSqlParseTest extends 
AbstractTestBase {
     public void testMongoDbComplexTypeToKafka() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlExtractNodeToDorisLoadNodeTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlExtractNodeToDorisLoadNodeTest.java
index fa5a929b05..422cde37e7 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlExtractNodeToDorisLoadNodeTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlExtractNodeToDorisLoadNodeTest.java
@@ -117,7 +117,6 @@ public class MySqlExtractNodeToDorisLoadNodeTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
index 1c324b5532..17ad63b178 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
@@ -164,7 +164,6 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase 
{
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
@@ -192,7 +191,6 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase 
{
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
@@ -220,7 +218,6 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase 
{
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
@@ -248,7 +245,6 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase 
{
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
index f0302a9673..1aac2d11bd 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
@@ -160,7 +160,6 @@ public class MySqlTemporalJoinRelationSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
@@ -192,7 +191,6 @@ public class MySqlTemporalJoinRelationSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
@@ -224,7 +222,6 @@ public class MySqlTemporalJoinRelationSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
@@ -256,7 +253,6 @@ public class MySqlTemporalJoinRelationSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/NativeFlinkSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/NativeFlinkSqlParserTest.java
index 9c0fdb4abe..4e27ec9700 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/NativeFlinkSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/NativeFlinkSqlParserTest.java
@@ -40,7 +40,6 @@ public class NativeFlinkSqlParserTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
index 4ec2ee4f84..6f61c9eaf7 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
@@ -135,7 +135,6 @@ public class OracleExtractSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
index b667b0719e..0b7e7e4fc5 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
@@ -112,7 +112,6 @@ public class OracleLoadSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
index b087fe13b8..342c7e3e38 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
@@ -130,7 +130,6 @@ public class PostgresExtractFlinkSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
index 9dbde02251..52e6671bf7 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
@@ -119,7 +119,6 @@ public class PostgresLoadNodeFlinkSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
index b64bf687b3..5db0e6ae5d 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
@@ -94,7 +94,6 @@ public class PulsarSqlParserTest extends AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
index 717bba95c5..49651dc68c 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
@@ -137,7 +137,6 @@ public class RedisNodeSqlParserTest extends 
AbstractTestBase {
     public void testRedis() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
index 8d2fe99529..56927ab5e4 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
@@ -162,7 +162,6 @@ public class RedisTemporalJoinRelationSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
@@ -197,7 +196,6 @@ public class RedisTemporalJoinRelationSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
index 0960e0b84c..f945bcb3ec 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
@@ -243,7 +243,6 @@ public class RightOuterJoinSqlParseTest extends 
AbstractTestBase {
     public void testRightOuterJoin() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -280,7 +279,6 @@ public class RightOuterJoinSqlParseTest extends 
AbstractTestBase {
     public void testRightOuterJoinWithDistinctAndFilter() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
index dc0886ad40..5eff7df857 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
@@ -160,7 +160,6 @@ public class SqlServerNodeSqlParseTest extends 
AbstractTestBase {
     public void testSqlServerLoad() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -184,7 +183,6 @@ public class SqlServerNodeSqlParseTest extends 
AbstractTestBase {
     public void testSqlServerExtract() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
index bffa2eede0..4662eff382 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
@@ -112,7 +112,6 @@ public class TDSQLPostgresLoadNodeFlinkSqlParseTest extends 
AbstractTestBase {
         env.disableOperatorChaining();
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
index 87cc962cc2..9d6cb5336c 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
@@ -99,7 +99,6 @@ public class TubeMQNodeSqlParseTest extends AbstractTestBase {
     public void testTubeMQToKafka() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/UnionSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/UnionSqlParseTest.java
index 43b74a9c68..4616a13697 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/UnionSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/UnionSqlParseTest.java
@@ -148,7 +148,6 @@ public class UnionSqlParseTest extends AbstractTestBase {
     public void testUnionSqlParse() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -182,7 +181,6 @@ public class UnionSqlParseTest extends AbstractTestBase {
     public void testUnionSqlParseWithoutTransform() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
-                .useBlinkPlanner()
                 .inStreamingMode()
                 .build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/inlong-sort/sort-end-to-end-tests/pom.xml 
b/inlong-sort/sort-end-to-end-tests/pom.xml
index ffdaa10d5c..939301eb79 100644
--- a/inlong-sort/sort-end-to-end-tests/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/pom.xml
@@ -68,7 +68,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
+            <artifactId>${flink.test.utils.artifactId}</artifactId>
             <version>${flink.version}</version>
             <scope>test</scope>
             <exclusions>
@@ -95,7 +95,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <artifactId>${flink.runtime.artifactId}</artifactId>
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
diff --git 
a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
 
b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index f10e8a225c..cbe4d9b64f 100644
--- 
a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++ 
b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -107,7 +107,6 @@ public abstract class FlinkContainerTestEnv extends 
TestLogger {
 
     private static GenericContainer<?> jobManager;
     private static GenericContainer<?> taskManager;
-
     // 
----------------------------------------------------------------------------------------
     // MYSQL Variables
     // 
----------------------------------------------------------------------------------------
@@ -239,6 +238,7 @@ public abstract class FlinkContainerTestEnv extends 
TestLogger {
 
     /**
      * Polling to detect task status until the task successfully into {@link 
JobStatus.RUNNING}
+     *
      * @param timeout
      */
     public void waitUntilJobRunning(Duration timeout) {
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
index 6092852f9f..4fbc441e7d 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
@@ -41,6 +41,7 @@
         
<flink.connector.mongodb.cdc.version>2.3.0</flink.connector.mongodb.cdc.version>
         <flink.connector.redis>1.1.0</flink.connector.redis>
         <flink.scala.binary.version>2.11</flink.scala.binary.version>
+        <flink.minor.version>1.13</flink.minor.version>
         
<flink.connector.mysql.cdc.version>2.2.1</flink.connector.mysql.cdc.version>
         <flink.scala.binary.version>2.11</flink.scala.binary.version>
         <flink.jackson.version>2.12.1-13.0</flink.jackson.version>

Reply via email to