This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ed159080ba [INLONG-8233][Sort] Support running on both flink1.13 and
flink1.15 (#8322)
ed159080ba is described below
commit ed159080ba5eb5f370ddad15dc0b4af2ed5fb796
Author: haibo.duan <[email protected]>
AuthorDate: Thu Jun 29 13:12:03 2023 +0800
[INLONG-8233][Sort] Support running on both flink1.13 and flink1.15 (#8322)
Co-authored-by: Charles Zhang <[email protected]>
---
.github/workflows/ci_ut_flink15.yml | 80 ++++++++++++++++++++++
inlong-sort/pom.xml | 38 ++++++++++
inlong-sort/sort-core/pom.xml | 2 +-
.../sort/function/CascadeFunctionWrapperTest.java | 1 -
.../inlong/sort/function/EncryptFunctionTest.java | 1 -
.../sort/function/JsonGetterFunctionTest.java | 1 -
.../function/RegexpReplaceFirstFunctionTest.java | 1 -
.../sort/function/RegexpReplaceFunctionTest.java | 1 -
.../sort/function/SplitIndexFunctionTest.java | 1 -
.../inlong/sort/parser/AllMigrateMongoDBTest.java | 1 -
.../inlong/sort/parser/AllMigrateOracleTest.java | 2 -
.../sort/parser/AllMigratePostgreSQLTest.java | 1 -
.../apache/inlong/sort/parser/AllMigrateTest.java | 3 -
.../parser/AllMigrateWithSpecifyingFieldTest.java | 1 -
.../sort/parser/ClickHouseSqlParserTest.java | 1 -
.../sort/parser/CustomFunctionSqlParseTest.java | 1 -
.../sort/parser/DataTypeConvertSqlParseTest.java | 2 -
.../sort/parser/DecimalFormatSqlParseTest.java | 1 -
.../sort/parser/DistinctNodeSqlParseTest.java | 3 -
.../DorisExtractNodeToDorisLoadNodeTest.java | 1 -
.../DorisExtractNodeToMySqlLoadNodeTest.java | 1 -
.../inlong/sort/parser/DorisMultipleSinkTest.java | 1 -
.../inlong/sort/parser/ESMultipleSinkTest.java | 1 -
.../sort/parser/ElasticsearchSqlParseTest.java | 1 -
.../sort/parser/FilesystemSqlParserTest.java | 1 -
.../apache/inlong/sort/parser/FilterParseTest.java | 2 -
.../inlong/sort/parser/FlinkSqlParserTest.java | 3 -
.../sort/parser/FullOuterJoinSqlParseTest.java | 2 -
.../sort/parser/GreenplumLoadSqlParseTest.java | 1 -
.../sort/parser/HbaseLoadFlinkSqlParseTest.java | 1 -
.../inlong/sort/parser/HudiNodeSqlParserTest.java | 1 -
.../sort/parser/IcebergNodeSqlParserTest.java | 1 -
.../sort/parser/InnerJoinRelationSqlParseTest.java | 3 -
.../parser/IntervalJoinRelationSqlParseTest.java | 1 -
.../inlong/sort/parser/KafkaLoadSqlParseTest.java | 6 --
.../inlong/sort/parser/KafkaSqlParseTest.java | 2 -
.../sort/parser/LeftOuterJoinSqlParseTest.java | 2 -
.../inlong/sort/parser/MetaFieldSyncTest.java | 1 -
.../sort/parser/MongoExtractFlinkSqlParseTest.java | 2 -
.../MySqlExtractNodeToDorisLoadNodeTest.java | 1 -
.../inlong/sort/parser/MySqlLoadSqlParseTest.java | 4 --
.../MySqlTemporalJoinRelationSqlParseTest.java | 4 --
.../sort/parser/NativeFlinkSqlParserTest.java | 1 -
.../sort/parser/OracleExtractSqlParseTest.java | 1 -
.../inlong/sort/parser/OracleLoadSqlParseTest.java | 1 -
.../parser/PostgresExtractFlinkSqlParseTest.java | 1 -
.../parser/PostgresLoadNodeFlinkSqlParseTest.java | 1 -
.../inlong/sort/parser/PulsarSqlParserTest.java | 1 -
.../inlong/sort/parser/RedisNodeSqlParserTest.java | 1 -
.../RedisTemporalJoinRelationSqlParseTest.java | 2 -
.../sort/parser/RightOuterJoinSqlParseTest.java | 2 -
.../sort/parser/SqlServerNodeSqlParseTest.java | 2 -
.../TDSQLPostgresLoadNodeFlinkSqlParseTest.java | 1 -
.../inlong/sort/parser/TubeMQNodeSqlParseTest.java | 1 -
.../inlong/sort/parser/UnionSqlParseTest.java | 2 -
inlong-sort/sort-end-to-end-tests/pom.xml | 4 +-
.../sort/tests/utils/FlinkContainerTestEnv.java | 2 +-
inlong-sort/sort-flink/sort-flink-v1.13/pom.xml | 1 +
58 files changed, 123 insertions(+), 86 deletions(-)
diff --git a/.github/workflows/ci_ut_flink15.yml
b/.github/workflows/ci_ut_flink15.yml
new file mode 100644
index 0000000000..e0d4fc9881
--- /dev/null
+++ b/.github/workflows/ci_ut_flink15.yml
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: InLong Unit Test For Flink 1.15
+
+on:
+ push:
+ paths:
+ - '.github/workflows/ci_ut_flink15.yml'
+ - 'inlong-sort/**'
+ - '!**.md'
+
+ pull_request:
+ paths:
+ - '.github/workflows/ci_ut_flink15.yml'
+ - 'inlong-sort/**'
+ - '!**.md'
+
+jobs:
+ unit-test:
+ name: Unit Test
+ runs-on: ubuntu-22.04
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+
+ - name: Set up JDK
+ uses: actions/setup-java@v3
+ with:
+ java-version: 8
+ distribution: adopt
+
+ - name: Cache Maven packages
+ uses: actions/cache@v3
+ with:
+ path: |
+ ~/.m2/repository
+ !~/.m2/repository/org/apache/inlong
+ key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-m2
+
+ - name: Build for Flink 1.15 with Maven
+ run: mvn --update-snapshots -e -V package -U -pl inlong-sort/sort-core
-amd -Pv1.15 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-Daether.connector.http.reuseConnections=false
-Daether.connector.requestTimeout=60000
+ env:
+ CI: false
+
+ - name: Unit test for Flink 1.15 with Maven
+ run: mvn --update-snapshots -e -V test -Dtest="*" -pl
inlong-sort/sort-core
+ env:
+ CI: false
+
+ - name: Upload unit test results
+ if: ${{ failure() }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: surefire-reports
+ path: ./**/target/surefire-reports/
+ if-no-files-found: ignore
+
+ - name: Upload integration test results
+ if: ${{ failure() }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: failsafe-reports
+ path: ./**/target/failsafe-reports/
+ if-no-files-found: ignore
\ No newline at end of file
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 0dd449f894..be848ce111 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -161,4 +161,42 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>v1.13</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <sort.flink.version>v1.13</sort.flink.version>
+ <flink.version>1.13.5</flink.version>
+ <flink.minor.version>1.13</flink.minor.version>
+ <flink.scala.binary.version>2.11</flink.scala.binary.version>
+ <flink.jackson.version>2.12.1-13.0</flink.jackson.version>
+ <flink.protobuf.version>2.7.6</flink.protobuf.version>
+ <flink.cdc.base.version>2.3.0</flink.cdc.base.version>
+
<flink.streaming.artifactId>flink-streaming-java_${flink.scala.binary.version}</flink.streaming.artifactId>
+
<flink.test.utils.artifactId>flink-test-utils_${flink.scala.binary.version}</flink.test.utils.artifactId>
+
<flink.runtime.artifactId>flink-runtime_${scala.binary.version}</flink.runtime.artifactId>
+
<flink.docker.image.name>flink:1.13.5-scala_2.11</flink.docker.image.name>
+ </properties>
+ </profile>
+ <profile>
+ <id>v1.15</id>
+ <properties>
+ <sort.flink.version>v1.15</sort.flink.version>
+ <flink.version>1.15.4</flink.version>
+ <flink.minor.version>1.15</flink.minor.version>
+ <flink.scala.binary.version>2.12</flink.scala.binary.version>
+ <flink.jackson.version>2.12.1-13.0</flink.jackson.version>
+ <flink.protobuf.version>2.7.6</flink.protobuf.version>
+ <flink.cdc.base.version>2.3.0</flink.cdc.base.version>
+
<flink.streaming.artifactId>flink-streaming-java</flink.streaming.artifactId>
+
<flink.test.utils.artifactId>flink-test-utils</flink.test.utils.artifactId>
+
<flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
+
<flink.docker.image.name>flink:1.15.4-scala_2.12</flink.docker.image.name>
+ </properties>
+ </profile>
+ </profiles>
</project>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index b3239e3635..25935ec3b9 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -70,7 +70,7 @@
<!-- for test -->
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
+ <artifactId>${flink.test.utils.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/CascadeFunctionWrapperTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/CascadeFunctionWrapperTest.java
index e5a9ae75de..f8eef41c1f 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/CascadeFunctionWrapperTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/CascadeFunctionWrapperTest.java
@@ -56,7 +56,6 @@ public class CascadeFunctionWrapperTest extends
AbstractTestBase {
// step 0. Initialize the execution environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/EncryptFunctionTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/EncryptFunctionTest.java
index 1f63247e59..bdf09b601f 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/EncryptFunctionTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/EncryptFunctionTest.java
@@ -53,7 +53,6 @@ public class EncryptFunctionTest extends AbstractTestBase {
public void testEncryptFunction() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
index 53aab54dde..be7c98dfa1 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
@@ -52,7 +52,6 @@ public class JsonGetterFunctionTest extends AbstractTestBase {
public void testJsonGetterFunction() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunctionTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunctionTest.java
index 0ae32ed30f..4ebddc464b 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunctionTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunctionTest.java
@@ -54,7 +54,6 @@ public class RegexpReplaceFirstFunctionTest extends
AbstractTestBase {
// step 0. Initialize the execution environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
index 3cfe8e9550..ea25034b53 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
@@ -53,7 +53,6 @@ public class RegexpReplaceFunctionTest extends
AbstractTestBase {
// step 0. Initialize the execution environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/SplitIndexFunctionTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/SplitIndexFunctionTest.java
index 715c97adb2..13701f707b 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/SplitIndexFunctionTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/SplitIndexFunctionTest.java
@@ -54,7 +54,6 @@ public class SplitIndexFunctionTest extends AbstractTestBase {
public void testSplitIndexFunction() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateMongoDBTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateMongoDBTest.java
index 3855e08531..fb3f8fd61f 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateMongoDBTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateMongoDBTest.java
@@ -86,7 +86,6 @@ public class AllMigrateMongoDBTest extends AbstractTestBase {
public void testAllMigrate() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java
index 2d041128f4..c44e35f561 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateOracleTest.java
@@ -120,7 +120,6 @@ public class AllMigrateOracleTest extends AbstractTestBase {
public void testAllMigrate() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -147,7 +146,6 @@ public class AllMigrateOracleTest extends AbstractTestBase {
public void testAllMigrateWithBytesFormat() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java
index 995d881377..607cbf9d68 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java
@@ -105,7 +105,6 @@ public class AllMigratePostgreSQLTest extends
AbstractTestBase {
public void testAllMigrate() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 334cb52ac2..1f75ca89d4 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -144,7 +144,6 @@ public class AllMigrateTest extends AbstractTestBase {
public void testAllMigrate() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -171,7 +170,6 @@ public class AllMigrateTest extends AbstractTestBase {
public void testAllMigrateMultiRelations() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -201,7 +199,6 @@ public class AllMigrateTest extends AbstractTestBase {
public void testAllMigrateWithBytesFormat() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
index 21c854a9a3..1f5f1007d0 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
@@ -108,7 +108,6 @@ public class AllMigrateWithSpecifyingFieldTest extends
AbstractTestBase {
public void testAllMigrateMySQLToMySQL() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
index 590a9a11a8..c1df2b7a21 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
@@ -108,7 +108,6 @@ public class ClickHouseSqlParserTest extends
AbstractTestBase {
public void testClickHouse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
index 9ad649fb66..d765117a07 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
@@ -113,7 +113,6 @@ public class CustomFunctionSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
index 3c4f2c5b11..e0e7558ee2 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
@@ -117,7 +117,6 @@ public class DataTypeConvertSqlParseTest extends
AbstractTestBase {
public void testDataTypeConvertSqlParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -167,7 +166,6 @@ public class DataTypeConvertSqlParseTest extends
AbstractTestBase {
public void testHBaseDataTypeConvertSqlParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java
index 618c75506f..7a19568659 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DecimalFormatSqlParseTest.java
@@ -87,7 +87,6 @@ public class DecimalFormatSqlParseTest extends
AbstractTestBase {
public void testDecimalFormatSqlParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
index 958a811fa3..a1e7c65d54 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
@@ -244,7 +244,6 @@ public class DistinctNodeSqlParseTest extends
AbstractTestBase {
public void testDistinctBasedProcessTime() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -275,7 +274,6 @@ public class DistinctNodeSqlParseTest extends
AbstractTestBase {
public void testDistinctBasedTimeField() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -306,7 +304,6 @@ public class DistinctNodeSqlParseTest extends
AbstractTestBase {
public void testDistinctBasedEventTime() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToDorisLoadNodeTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToDorisLoadNodeTest.java
index 95e3cdf9a0..2661ae5dba 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToDorisLoadNodeTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToDorisLoadNodeTest.java
@@ -128,7 +128,6 @@ public class DorisExtractNodeToDorisLoadNodeTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
index 8e0f445e85..e07fd49d9d 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
@@ -128,7 +128,6 @@ public class DorisExtractNodeToMySqlLoadNodeTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisMultipleSinkTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisMultipleSinkTest.java
index 50e4b6d6d0..54f20de477 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisMultipleSinkTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisMultipleSinkTest.java
@@ -106,7 +106,6 @@ public class DorisMultipleSinkTest extends AbstractTestBase
{
public void testDorisMultipleSinkParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ESMultipleSinkTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ESMultipleSinkTest.java
index b73d1d5c43..80186cee28 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ESMultipleSinkTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ESMultipleSinkTest.java
@@ -106,7 +106,6 @@ public class ESMultipleSinkTest extends AbstractTestBase {
public void testESMultipleSinkParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
index f4443d6ae3..29a3d1fd51 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
@@ -103,7 +103,6 @@ public abstract class ElasticsearchSqlParseTest extends
AbstractTestBase {
public void testMysqlToElasticsearch(Node node) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
index 6a8f4fc700..51478cf704 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
@@ -112,7 +112,6 @@ public class FilesystemSqlParserTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
index d2dfcc059c..e76a829cb5 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
@@ -114,7 +114,6 @@ public class FilterParseTest extends AbstractTestBase {
public void testFilterWithRetainParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -143,7 +142,6 @@ public class FilterParseTest extends AbstractTestBase {
public void testFilterWithRemoveParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
index 8b7c29f5a1..6db698414d 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
@@ -184,7 +184,6 @@ public class FlinkSqlParserTest extends AbstractTestBase {
public void testMysqlToHive() {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -222,7 +221,6 @@ public class FlinkSqlParserTest extends AbstractTestBase {
public void testToFileSystem() {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -249,7 +247,6 @@ public class FlinkSqlParserTest extends AbstractTestBase {
public void testFlinkSqlParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
index fd2e156530..cad150822b 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
@@ -243,7 +243,6 @@ public class FullOuterJoinSqlParseTest extends
AbstractTestBase {
public void testFullOuterJoin() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -280,7 +279,6 @@ public class FullOuterJoinSqlParseTest extends
AbstractTestBase {
public void testFullOuterJoinWithDistinctAndFilter() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
index 6b8a9cd9ea..bdf72278af 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/GreenplumLoadSqlParseTest.java
@@ -104,7 +104,6 @@ public class GreenplumLoadSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
index 489b839688..291f89f22e 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
@@ -117,7 +117,6 @@ public class HbaseLoadFlinkSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
index 0fb1fece29..0b7ff83296 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -166,7 +166,6 @@ public class HudiNodeSqlParserTest extends AbstractTestBase
{
public void testHudi() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
index 1bd06e2f60..41d5701fdf 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
@@ -159,7 +159,6 @@ public class IcebergNodeSqlParserTest extends
AbstractTestBase {
public void testIceberg() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
index dd5ecce42c..bde57ec2cc 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
@@ -271,7 +271,6 @@ public class InnerJoinRelationSqlParseTest extends
AbstractTestBase {
public void testInnerJoin() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -307,7 +306,6 @@ public class InnerJoinRelationSqlParseTest extends
AbstractTestBase {
public void testInnerJoinWithUpsertKafka() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -344,7 +342,6 @@ public class InnerJoinRelationSqlParseTest extends
AbstractTestBase {
public void testInnerJoinWithDistinctAndFilter() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
index 140bd28218..3e7c51ff53 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
@@ -159,7 +159,6 @@ public class IntervalJoinRelationSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
index db3ec360db..957e890e68 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
@@ -225,7 +225,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase
{
public void testKafkaSourceSqlParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -253,7 +252,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase
{
public void testKafkaDynamicTopicParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -281,7 +279,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase
{
public void testKafkaDynamicPartitionParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -309,7 +306,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase
{
public void testKafkaDynamicPartitionWithPrimaryKey() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -337,7 +333,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase
{
public void testKafkaDirtyHandleSqlParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -365,7 +360,6 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase
{
public void testKafkaDirtyHandleWithDynamicTopicSqlParse() throws
Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
index 9ece69abe6..79991404af 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaSqlParseTest.java
@@ -64,7 +64,6 @@ public class KafkaSqlParseTest extends AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
@@ -150,7 +149,6 @@ public class KafkaSqlParseTest extends AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
index c31bb23804..547535a136 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
@@ -244,7 +244,6 @@ public class LeftOuterJoinSqlParseTest extends
AbstractTestBase {
public void testLeftOuterJoin() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -281,7 +280,6 @@ public class LeftOuterJoinSqlParseTest extends
AbstractTestBase {
public void testLeftOuterJoinWithDistinctAndFilter() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
index 3d25e9b58d..d7e4b75106 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
@@ -225,7 +225,6 @@ public class MetaFieldSyncTest extends AbstractTestBase {
public void testMetaFieldSyncTest() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
index 8161bc1183..d2bb5645de 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
@@ -108,7 +108,6 @@ public class MongoExtractFlinkSqlParseTest extends
AbstractTestBase {
public void testMongoDbToKafka() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -178,7 +177,6 @@ public class MongoExtractFlinkSqlParseTest extends
AbstractTestBase {
public void testMongoDbComplexTypeToKafka() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlExtractNodeToDorisLoadNodeTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlExtractNodeToDorisLoadNodeTest.java
index fa5a929b05..422cde37e7 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlExtractNodeToDorisLoadNodeTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlExtractNodeToDorisLoadNodeTest.java
@@ -117,7 +117,6 @@ public class MySqlExtractNodeToDorisLoadNodeTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
index 1c324b5532..17ad63b178 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
@@ -164,7 +164,6 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase
{
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
@@ -192,7 +191,6 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase
{
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
@@ -220,7 +218,6 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase
{
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
@@ -248,7 +245,6 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase
{
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
index f0302a9673..1aac2d11bd 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
@@ -160,7 +160,6 @@ public class MySqlTemporalJoinRelationSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
@@ -192,7 +191,6 @@ public class MySqlTemporalJoinRelationSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
@@ -224,7 +222,6 @@ public class MySqlTemporalJoinRelationSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
@@ -256,7 +253,6 @@ public class MySqlTemporalJoinRelationSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/NativeFlinkSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/NativeFlinkSqlParserTest.java
index 9c0fdb4abe..4e27ec9700 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/NativeFlinkSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/NativeFlinkSqlParserTest.java
@@ -40,7 +40,6 @@ public class NativeFlinkSqlParserTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
index 4ec2ee4f84..6f61c9eaf7 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
@@ -135,7 +135,6 @@ public class OracleExtractSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
index b667b0719e..0b7e7e4fc5 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
@@ -112,7 +112,6 @@ public class OracleLoadSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
index b087fe13b8..342c7e3e38 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
@@ -130,7 +130,6 @@ public class PostgresExtractFlinkSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
index 9dbde02251..52e6671bf7 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
@@ -119,7 +119,6 @@ public class PostgresLoadNodeFlinkSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
index b64bf687b3..5db0e6ae5d 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
@@ -94,7 +94,6 @@ public class PulsarSqlParserTest extends AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
index 717bba95c5..49651dc68c 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisNodeSqlParserTest.java
@@ -137,7 +137,6 @@ public class RedisNodeSqlParserTest extends
AbstractTestBase {
public void testRedis() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
index 8d2fe99529..56927ab5e4 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
@@ -162,7 +162,6 @@ public class RedisTemporalJoinRelationSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
@@ -197,7 +196,6 @@ public class RedisTemporalJoinRelationSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
index 0960e0b84c..f945bcb3ec 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
@@ -243,7 +243,6 @@ public class RightOuterJoinSqlParseTest extends
AbstractTestBase {
public void testRightOuterJoin() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -280,7 +279,6 @@ public class RightOuterJoinSqlParseTest extends
AbstractTestBase {
public void testRightOuterJoinWithDistinctAndFilter() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
index dc0886ad40..5eff7df857 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
@@ -160,7 +160,6 @@ public class SqlServerNodeSqlParseTest extends
AbstractTestBase {
public void testSqlServerLoad() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -184,7 +183,6 @@ public class SqlServerNodeSqlParseTest extends
AbstractTestBase {
public void testSqlServerExtract() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
index bffa2eede0..4662eff382 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
@@ -112,7 +112,6 @@ public class TDSQLPostgresLoadNodeFlinkSqlParseTest extends
AbstractTestBase {
env.disableOperatorChaining();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
index 87cc962cc2..9d6cb5336c 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java
@@ -99,7 +99,6 @@ public class TubeMQNodeSqlParseTest extends AbstractTestBase {
public void testTubeMQToKafka() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/UnionSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/UnionSqlParseTest.java
index 43b74a9c68..4616a13697 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/UnionSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/UnionSqlParseTest.java
@@ -148,7 +148,6 @@ public class UnionSqlParseTest extends AbstractTestBase {
public void testUnionSqlParse() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -182,7 +181,6 @@ public class UnionSqlParseTest extends AbstractTestBase {
public void testUnionSqlParseWithoutTransform() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/inlong-sort/sort-end-to-end-tests/pom.xml
b/inlong-sort/sort-end-to-end-tests/pom.xml
index ffdaa10d5c..939301eb79 100644
--- a/inlong-sort/sort-end-to-end-tests/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/pom.xml
@@ -68,7 +68,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
+ <artifactId>${flink.test.utils.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
@@ -95,7 +95,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <artifactId>${flink.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
diff --git
a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index f10e8a225c..cbe4d9b64f 100644
---
a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++
b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -107,7 +107,6 @@ public abstract class FlinkContainerTestEnv extends
TestLogger {
private static GenericContainer<?> jobManager;
private static GenericContainer<?> taskManager;
-
//
----------------------------------------------------------------------------------------
// MYSQL Variables
//
----------------------------------------------------------------------------------------
@@ -239,6 +238,7 @@ public abstract class FlinkContainerTestEnv extends
TestLogger {
/**
* Polling to detect task status until the task successfully into {@link
JobStatus.RUNNING}
+ *
* @param timeout
*/
public void waitUntilJobRunning(Duration timeout) {
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
index 6092852f9f..4fbc441e7d 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
@@ -41,6 +41,7 @@
<flink.connector.mongodb.cdc.version>2.3.0</flink.connector.mongodb.cdc.version>
<flink.connector.redis>1.1.0</flink.connector.redis>
<flink.scala.binary.version>2.11</flink.scala.binary.version>
+ <flink.minor.version>1.13</flink.minor.version>
<flink.connector.mysql.cdc.version>2.2.1</flink.connector.mysql.cdc.version>
<flink.scala.binary.version>2.11</flink.scala.binary.version>
<flink.jackson.version>2.12.1-13.0</flink.jackson.version>