This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new d9a772f55 [Feature] Supported apache flink 1.20 (#4263)
d9a772f55 is described below

commit d9a772f5528776322f024ff4bfa032c6605ccf7e
Author: ouyangwulin <[email protected]>
AuthorDate: Mon Jun 30 13:38:53 2025 +0800

    [Feature] Supported apache flink 1.20 (#4263)
    
    * supported apache flink 1.20
    * fixed module
---
 .github/workflows/e2e.yml                          |   6 +-
 pom.xml                                            |   2 +-
 .../streampark/common/conf/FlinkVersion.scala      |   2 +-
 .../streampark-console-service/pom.xml             |   7 +
 .../console/core/runner/EnvInitializer.java        |   2 +-
 .../impl/FlinkApplicationInfoServiceImpl.java      |   8 +-
 .../core/service/FlinkSavepointServiceTest.java    |  11 +-
 .../cases/Flink120OnRemoteClusterDeployTest.java   | 104 ++++++++
 .../e2e/cases/Flink120OnYarnClusterDeployTest.java | 152 ++++++++++++
 .../e2e/cases/FlinkSQL120OnYarnTest.java           | 269 +++++++++++++++++++++
 .../docker/flink-1.20-on-remote/Dockerfile         |  18 ++
 .../flink-1.20-on-remote/docker-compose.yaml       |  90 +++++++
 .../resources/docker/flink-1.20-on-yarn/Dockerfile |  45 ++++
 .../flink-1.20-on-yarn/docker-compose.config       |  44 ++++
 .../docker/flink-1.20-on-yarn/docker-compose.yaml  | 163 +++++++++++++
 .../flink/client/impl/YarnPerJobClient.scala       |   3 +-
 .../flink/kubernetes/KubernetesRetriever.scala     |   2 +-
 streampark-flink/streampark-flink-shims/pom.xml    |   1 +
 .../streampark-flink-shims-test/pom.xml            |   2 +-
 .../pom.xml                                        | 119 +++++----
 .../streampark/flink/core/FlinkClusterClient.scala |  64 +++++
 .../flink/core/FlinkKubernetesClient.scala         |  32 +++
 .../streampark/flink/core/StreamTableContext.scala | 169 +++++++++++++
 .../streampark/flink/core/TableContext.scala       | 106 ++++++++
 .../apache/streampark/flink/core/TableExt.scala    |  43 ++++
 tools/dependencies/known-dependencies.txt          |  33 +--
 26 files changed, 1418 insertions(+), 79 deletions(-)

diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index 070e984b5..4582dafc0 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -136,13 +136,13 @@ jobs:
             class: org.apache.streampark.e2e.cases.VariableManagementTest
 
           - name: FlinkOnRemoteClusterDeployTest
-            class: 
org.apache.streampark.e2e.cases.Flink118OnRemoteClusterDeployTest
+            class: 
org.apache.streampark.e2e.cases.Flink120OnRemoteClusterDeployTest
 
           - name: FlinkOnYarnClusterDeployTest
-            class: 
org.apache.streampark.e2e.cases.Flink118OnYarnClusterDeployTest
+            class: 
org.apache.streampark.e2e.cases.Flink120OnYarnClusterDeployTest
 
           - name: FlinkSQLOnYarnTest
-            class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest
+            class: org.apache.streampark.e2e.cases.FlinkSQL120OnYarnTest
 
           #- name: Flink117OnRemoteClusterDeployTest
           #  class: 
org.apache.streampark.e2e.cases.Flink117OnRemoteClusterDeployTest
diff --git a/pom.xml b/pom.xml
index c1de29d32..8f5103008 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,7 +93,7 @@
         
<scala.binary.flink.version>_${scala.binary.version}</scala.binary.flink.version>
         <flink.connector.version>3.2.0-1.18</flink.connector.version>
         
<flink.elasticserch.connector.version>3.0.1-1.17</flink.elasticserch.connector.version>
-        <flink.version>1.18.1</flink.version>
+        <flink.version>1.20.1</flink.version>
         <flink.shaded.version>1.8.1</flink.shaded.version>
         <streampark.shaded.version>1.0.0</streampark.shaded.version>
         <streampark.flink.shims.version>1.14</streampark.flink.shims.version>
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index e4950e247..83a09a227 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -126,7 +126,7 @@ class FlinkVersion(val flinkHome: String) extends 
Serializable with Logger {
 
   def checkVersion(throwException: Boolean = true): Boolean = {
     version.split("\\.").map(_.trim.toInt) match {
-      case Array(1, v, _) if v >= 12 && v <= 19 => true
+      case Array(1, v, _) if v >= 12 && v <= 20 => true
       case _ =>
         if (throwException) {
           throw new UnsupportedOperationException(s"Unsupported flink version: 
$version")
diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index ea09556ea..66d8a6463 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -586,6 +586,13 @@
                             <version>${project.version}</version>
                             
<outputDirectory>${project.build.directory}/shims</outputDirectory>
                         </dependency>
+                        <!-- flink 1.20 support-->
+                        <dependency>
+                            <groupId>org.apache.streampark</groupId>
+                            
<artifactId>streampark-flink-shims_flink-1.20_${scala.binary.version}</artifactId>
+                            <version>${project.version}</version>
+                            
<outputDirectory>${project.build.directory}/shims</outputDirectory>
+                        </dependency>
                         <!-- flink-submit-core -->
                         <dependency>
                             <groupId>org.apache.streampark</groupId>
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 4e5ba38ac..c9addb417 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -71,7 +71,7 @@ public class EnvInitializer implements ApplicationRunner {
     private final FileFilter fileFilter = p -> !".gitkeep".equals(p.getName());
 
     private static final Pattern PATTERN_FLINK_SHIMS_JAR = Pattern.compile(
-        "^streampark-flink-shims_flink-(1.1[2-9])_(2.12)-(.*).jar$",
+        "^streampark-flink-shims_flink-(1.1[2-9]|1\\.2[0-9])_(2.12)-(.*).jar$",
         Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
 
     @SneakyThrows
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
index 7276bcde1..8ced52f6a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
@@ -484,21 +484,21 @@ public class FlinkApplicationInfoServiceImpl extends 
ServiceImpl<FlinkApplicatio
             final String pathPart = uri.getPath();
             String error = null;
             if (scheme == null) {
-                error = "This state.savepoints.dir value "
+                error = "This state-savepoints-dir value "
                     + savepointPath
                     + " scheme (hdfs://, file://, etc) of  is null. Please 
specify the file system scheme explicitly in the URI.";
             } else if (pathPart == null) {
-                error = "This state.savepoints.dir value "
+                error = "This state-savepoints-dir value "
                     + savepointPath
                     + " path part to store the checkpoint data in is null. 
Please specify a directory path for the checkpoint data.";
             } else if (pathPart.isEmpty() || "/".equals(pathPart)) {
-                error = "This state.savepoints.dir value "
+                error = "This state-savepoints-dir value "
                     + savepointPath
                     + " Cannot use the root directory for checkpoints.";
             }
             return error;
         } else {
-            return "When custom savepoint is not set, state.savepoints.dir 
needs to be set in properties or flink-conf.yaml of application";
+            return "When custom savepoint is not set, state-savepoints-dir 
needs to be set in properties or flink config of application";
         }
     }
 }
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
index 14cbc3fdb..54eece310 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
@@ -82,8 +82,8 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase {
      */
     @Test
     void testGetSavepointFromDynamicProps() {
-        String propsWithEmptyTargetValue = "-Dstate.savepoints.dir=";
-        String props = "-Dstate.savepoints.dir=hdfs:///test";
+        String propsWithEmptyTargetValue = 
"-Dexecution.checkpointing.savepoint-dir=";
+        String props = "-Dexecution.checkpointing.savepoint-dir=hdfs:///test";
         FlinkSavepointServiceImpl savepointServiceImpl = 
(FlinkSavepointServiceImpl) savepointService;
 
         
assertThat(savepointServiceImpl.getSavepointFromDynamicProps(null)).isNull();
@@ -114,11 +114,13 @@ class FlinkSavepointServiceTest extends 
SpringUnitTestBase {
         app.setJobType(FlinkJobType.FLINK_JAR.getMode());
         assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
 
+        String ckDir = SAVEPOINT_DIRECTORY.key() + "=hdfs:///test";
+
         // Test for (StreamPark job Or FlinkSQL job) with application config 
just disabled checkpoint.
         FlinkApplicationConfig appCfg = new FlinkApplicationConfig();
         appCfg.setId(appCfgId);
         appCfg.setAppId(appId);
-        appCfg.setContent("state.savepoints.dir=hdfs:///test");
+        appCfg.setContent(ckDir);
         appCfg.setFormat(ConfigFileTypeEnum.PROPERTIES.getValue());
         configService.save(appCfg);
         assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
@@ -134,9 +136,10 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase 
{
         // Test for configured CHECKPOINTING_INTERVAL
         appCfg.setContent(
             DeflaterUtils.zipString(
-                "state.savepoints.dir=hdfs:///test\n"
+                ckDir + "\n"
                     + String.format("%s=%s", CHECKPOINTING_INTERVAL.key(),
                         "3min")));
+
         configService.updateById(appCfg);
         FlinkEffective effective = new FlinkEffective();
         effective.setTargetId(appCfg.getId());
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
new file mode 100644
index 000000000..74c905a8e
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.RemoteForm;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-remote/docker-compose.yaml")
+public class Flink120OnRemoteClusterDeployTest {
+
+    public static RemoteWebDriver browser;
+
+    private static final String flinkName = "flink-1.20.1";
+
+    private static final String flinkHome = "/opt/flink/";
+
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+    private static final String flinkJobManagerUrl = "http://jobmanager:8081";;
+
+    private static final ClusterDetailForm.DeployMode deployMode = 
ClusterDetailForm.DeployMode.STANDALONE;
+
+    @BeforeAll
+    public static void setUp() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login()
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
+    }
+
+    @Test
+    @Order(1)
+    public void testCreateFlinkCluster() {
+        FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);
+
+        flinkClustersPage.createFlinkCluster()
+            .<RemoteForm>addCluster(deployMode)
+            .jobManagerURL(flinkJobManagerUrl)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(flinkClusterName)));
+    }
+
+    @Test
+    @Order(5)
+    public void testDeleteFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.deleteFlinkCluster(flinkClusterName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+                    Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+                    assertThat(flinkClustersPage.flinkClusterList)
+                        .noneMatch(it -> 
it.getText().contains(flinkClusterName));
+                });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
new file mode 100644
index 000000000..d6d379cc5
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-yarn/docker-compose.yaml")
+public class Flink120OnYarnClusterDeployTest {
+
+    public static RemoteWebDriver browser;
+
+    private static final String flinkName = "flink-1.20.1";
+
+    private static final String flinkHome = "/flink-1.20.1";
+
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+    private static final String flinkClusterNameEdited = 
"flink_1.20.1_cluster_e2e_edited";
+
+    private static final ClusterDetailForm.DeployMode deployMode = 
ClusterDetailForm.DeployMode.YARN_SESSION;
+
+    @BeforeAll
+    public static void setup() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login()
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
+    }
+
+    @Test
+    @Order(1)
+    public void testCreateFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.createFlinkCluster()
+            .<YarnSessionForm>addCluster(deployMode)
+            .resolveOrder(YarnSessionForm.ResolveOrder.CHILD_FIRST)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(flinkClusterName)));
+    }
+
+    @Test
+    @Order(2)
+    public void testEditFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.editFlinkCluster(flinkClusterName)
+            .<YarnSessionForm>addCluster(deployMode)
+            .clusterName(flinkClusterNameEdited)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain edited 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(flinkClusterNameEdited)));
+    }
+
+    @Test
+    @Order(3)
+    public void testStartFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.startFlinkCluster(flinkClusterNameEdited);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain running 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+    }
+
+    @Test
+    @Order(4)
+    public void testStopFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.stopFlinkCluster(flinkClusterNameEdited);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(5)
+    public void testDeleteFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.deleteFlinkCluster(flinkClusterNameEdited);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+                    Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+                    assertThat(flinkClustersPage.flinkClusterList)
+                        .noneMatch(it -> 
it.getText().contains(flinkClusterNameEdited));
+                });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
new file mode 100644
index 000000000..39147c962
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
+import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
+
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-yarn/docker-compose.yaml")
+public class FlinkSQL120OnYarnTest {
+
+    public static RemoteWebDriver browser;
+
+    private static final String flinkName = "flink-1.20.1";
+
+    private static final String flinkHome = "/flink-1.20.1";
+
+    private static final String applicationName = "flink-120-e2e-test";
+
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+    @BeforeAll
+    public static void setup() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login()
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        FlinkClustersPage flinkClustersPage = 
flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
+
+        flinkClustersPage.createFlinkCluster()
+            
.<YarnSessionForm>addCluster(ClusterDetailForm.DeployMode.YARN_SESSION)
+            .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        flinkClustersPage.startFlinkCluster(flinkClusterName);
+
+        flinkClustersPage.goToNav(ApacheFlinkPage.class)
+            .goToTab(ApplicationsPage.class);
+    }
+
+    @Test
+    @Order(1)
+    void testCreateFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage
+            .createApplication()
+            .addApplication(
+                ApplicationForm.FlinkJobType.FLINK_SQL,
+                ApplicationForm.DeployMode.YARN_APPLICATION,
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(2)
+    void testReleaseFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain released 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(3)
+    void testStartFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain finished 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(4)
+    @SneakyThrows
+    void testCancelFlinkApplicationOnYarnApplicationMode() {
+        Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain restarted 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        applicationsPage.cancelApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(5)
+    void testDeleteFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+
+                    assertThat(applicationsPage.applicationsList)
+                        .noneMatch(it -> 
it.getText().contains(applicationName));
+                });
+    }
+
+    @Test
+    @Order(11)
+    void testCreateFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage
+            .createApplication()
+            .addApplication(
+                ApplicationForm.FlinkJobType.FLINK_SQL,
+                ApplicationForm.DeployMode.YARN_SESSION,
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .flinkCluster(flinkClusterName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(12)
+    void testReleaseFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain released 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(13)
+    void testStartFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain finished 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(14)
+    @SneakyThrows
+    void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain restarted 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        applicationsPage.cancelApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(15)
+    void testDeleteFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+        applicationsPage.deleteApplication(applicationName);
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+
+                    assertThat(applicationsPage.applicationsList)
+                        .noneMatch(it -> 
it.getText().contains(applicationName));
+                });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
new file mode 100644
index 000000000..b1e093fd4
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
new file mode 100644
index 000000000..b16486e6b
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+services:
+  jobmanager:
+    image: flink:1.20.1
+    command: jobmanager
+    ports:
+      - "8081:8081"
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: jobmanager
+    networks:
+      - e2e
+    volumes:
+      - flink_data:/opt/flink
+      - /var/run/docker.sock:/var/run/docker.sock
+    healthcheck:
+      test: [ "CMD", "curl", "http://localhost:8081"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+  taskmanager:
+    image: flink:1.20.1
+    depends_on:
+      - jobmanager
+    command: taskmanager
+    scale: 1
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: jobmanager
+        taskmanager.numberOfTaskSlots: 2
+    networks:
+      - e2e
+    volumes:
+      - flink_data:/opt/flink
+      - /var/run/docker.sock:/var/run/docker.sock
+    healthcheck:
+      test: [ "CMD", "curl", "http://localhost:8081"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+  streampark:
+    image: apache/streampark:ci
+    command: bash bin/streampark.sh start_docker
+    build:
+      context: ./
+      dockerfile: ./Dockerfile
+    ports:
+      - 10000:10000
+      - 10030:10030
+    environment:
+      - SPRING_PROFILES_ACTIVE=h2
+      - TZ=Asia/Shanghai
+      - FLINK_JOBMANAGER_URL=http://jobmanager:8081
+    privileged: true
+    restart: unless-stopped
+    networks:
+      - e2e
+    volumes:
+      - flink_data:/opt/flink
+      - ${HOME}/streampark_build_logs:/tmp/streampark/logs/build_logs/
+      - /var/run/docker.sock:/var/run/docker.sock
+    healthcheck:
+      test: [ "CMD", "curl", "http://localhost:10000"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+networks:
+  e2e:
+volumes:
+  flink_data:
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
new file mode 100644
index 000000000..6b050d220
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+FROM flink:1.20.1-scala_2.12-java8 as flink-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+    && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g" 
/streampark/conf/config.yaml
+
+# Install Flink
+COPY --from=flink-image /opt/flink /flink-1.20.1
+RUN sudo chown -R hadoop.hadoop /flink-1.20.1
+
+# Install javac
+ARG TARGETPLATFORM
+RUN echo "TARGETPLATFORM: $TARGETPLATFORM"
+RUN \
+    if [ "$TARGETPLATFORM" = "linux/arm64" ];then \
+        sudo rm -f /etc/yum.repos.d/*.repo; \
+        sudo wget http://mirrors.aliyun.com/repo/Centos-altarch-7.repo -O 
/etc/yum.repos.d/CentOS-Base.repo; \
+        sudo sed -i "s/http:\/\//https:\/\//g" 
/etc/yum.repos.d/CentOS-Base.repo; \
+        sudo yum install -y java-1.8.0-openjdk-devel; \
+    elif [ "$TARGETPLATFORM" = "linux/amd64" ];then \
+        sudo yum install -y java-1.8.0-openjdk-devel; \
+    else \
+        echo "unknown TARGETPLATFORM: $TARGETPLATFORM"; \
+        exit 2;  \
+    fi
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
new file mode 100644
index 000000000..441fa1118
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+YARN-SITE.XML_yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage=99.9
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..73af8308b
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+services:
+  namenode:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: namenode
+    command: [ "hdfs", "namenode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "19870:9870"
+    env_file:
+      - docker-compose.config
+    environment:
+      ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://namenode:9870"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  datanode:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: datanode
+    command: [ "hdfs", "datanode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://datanode:9864"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+    depends_on:
+      namenode:
+        condition: service_healthy
+  resourcemanager:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: resourcemanager
+    command: [ "yarn", "resourcemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "18088:8088"
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://resourcemanager:8088"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  nodemanager:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: nodemanager
+    command: [ "yarn", "nodemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    depends_on:
+      resourcemanager:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "curl", "http://nodemanager:8042"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  streampark:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: streampark
+    command:
+      - sh
+      - -c
+      - /streampark/bin/streampark.sh start_docker
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "20000:10000"
+    environment:
+      - SPRING_PROFILES_ACTIVE=h2
+      - TZ=Asia/Shanghai
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    depends_on:
+      namenode:
+        condition: service_healthy
+      datanode:
+        condition: service_healthy
+      resourcemanager:
+        condition: service_healthy
+      nodemanager:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "curl", "http://streampark:10000"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+networks:
+  e2e:
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 5e8d6dfd0..d19ba8787 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -30,7 +30,6 @@ import 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
 import org.apache.hadoop.fs.{Path => HadoopPath}
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
-import java.io.File
 import java.lang.{Boolean => JavaBool}
 
 /** yarn PerJob mode submit */
@@ -60,7 +59,7 @@ object YarnPerJobClient extends YarnClientTrait {
       getYarnClusterDeployDescriptor(flinkConfig, submitRequest.hadoopUser)
     val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
     clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
-    clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
+    clusterDescriptor.addShipFiles(List(new HadoopPath(s"$flinkHome/lib")))
 
     var packagedProgram: PackagedProgram = null
     val clusterClient = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 21b96b41a..d78517581 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -44,7 +44,7 @@ object KubernetesRetriever extends Logger {
 
   // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
   val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
-    Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
+    
Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue().toMillis)
 
   private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]()
 
diff --git a/streampark-flink/streampark-flink-shims/pom.xml 
b/streampark-flink/streampark-flink-shims/pom.xml
index 964fcaca0..16e176448 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -40,6 +40,7 @@
         <module>streampark-flink-shims_flink-1.17</module>
         <module>streampark-flink-shims_flink-1.18</module>
         <module>streampark-flink-shims_flink-1.19</module>
+        <module>streampark-flink-shims_flink-1.20</module>
     </modules>
 
 </project>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
index 95131c581..4aa7c87cf 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
@@ -29,7 +29,7 @@
     <name>StreamPark : Flink Shims Test</name>
 
     <properties>
-        <flink.version>1.18.1</flink.version>
+        <flink.version>1.20.1</flink.version>
     </properties>
 
     <dependencies>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/pom.xml
similarity index 52%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
copy to 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/pom.xml
index 95131c581..b666ac63c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/pom.xml
@@ -16,7 +16,7 @@
   ~ limitations under the License.
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -25,69 +25,54 @@
         <version>2.2.0-SNAPSHOT</version>
     </parent>
 
-    
<artifactId>streampark-flink-shims-test_${scala.binary.version}</artifactId>
-    <name>StreamPark : Flink Shims Test</name>
+    
<artifactId>streampark-flink-shims_flink-1.20_${scala.binary.version}</artifactId>
+    <name>StreamPark : Flink Shims 1.20</name>
 
     <properties>
-        <flink.version>1.18.1</flink.version>
+        <flink.version>1.20.0</flink.version>
     </properties>
 
     <dependencies>
-
-        <dependency>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_${scala.binary.version}</artifactId>
-            <version>3.2.9</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.streampark</groupId>
             
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-shims_flink-1.14_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-core_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
 
+        <!--flink-->
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-scala_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-uber</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
@@ -99,34 +84,74 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients</artifactId>
+            <artifactId>flink-statebackend-rocksdb</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+            <artifactId>flink-yarn</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <optional>true</optional>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-statebackend-rocksdb</artifactId>
+            <artifactId>flink-kubernetes</artifactId>
             <version>${flink.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
-
     </dependencies>
 
-    <profiles>
-        <profile>
-            <id>apache-release</id>
-            <properties>
-                <maven.deploy.skip>true</maven.deploy.skip>
-            </properties>
-        </profile>
-    </profiles>
-
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            
<createDependencyReducedPom>true</createDependencyReducedPom>
+                            
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                            <artifactSet>
+                                <includes>
+                                    
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..ecfc8dabb
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T])
+  extends FlinkClientTrait[T](clusterClient) {
+
+  override def triggerSavepoint(
+      jobID: JobID,
+      savepointDir: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(
+      jobID,
+      savepointDir,
+      if (nativeFormat) SavepointFormatType.NATIVE
+      else SavepointFormatType.CANONICAL)
+  }
+
+  override def cancelWithSavepoint(
+      jobID: JobID,
+      savepointDirectory: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.cancelWithSavepoint(
+      jobID,
+      savepointDirectory,
+      if (nativeFormat) SavepointFormatType.NATIVE
+      else SavepointFormatType.CANONICAL)
+  }
+
+  override def stopWithSavepoint(
+      jobID: JobID,
+      advanceToEndOfEventTime: Boolean,
+      savepointDirectory: String,
+      nativeFormat: Boolean): CompletableFuture[String] = {
+    clusterClient.stopWithSavepoint(
+      jobID,
+      advanceToEndOfEventTime,
+      savepointDirectory,
+      if (nativeFormat) SavepointFormatType.NATIVE
+      else SavepointFormatType.CANONICAL)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
new file mode 100644
index 000000000..707ba43f0
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
+
+import java.util.Optional
+
+class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
+  extends FlinkKubernetesClientTrait(kubeClient) {
+
+  override def getService(serviceName: String): Optional[KubernetesService] = {
+    kubeClient.getService(serviceName)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..8d655abec
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.streampark.common.util.Implicits.JavaList
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, 
StreamTableEnvironment}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+class StreamTableContext(
+    override val parameter: ParameterTool,
+    private val streamEnv: StreamExecutionEnvironment,
+    private val tableEnv: StreamTableEnvironment)
+  extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+  def this(args: (ParameterTool, StreamExecutionEnvironment, 
StreamTableEnvironment)) =
+    this(args._1, args._2, args._3)
+
+  def this(args: StreamTableEnvConfig) =
+    this(FlinkTableInitializer.initialize(args))
+
+  override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): 
Table =
+    tableEnv.fromDataStream[T](dataStream, schema)
+
+  override def fromChangelogStream(dataStream: DataStream[Row]): Table =
+    tableEnv.fromChangelogStream(dataStream)
+
+  override def fromChangelogStream(dataStream: DataStream[Row], schema: 
Schema): Table =
+    tableEnv.fromChangelogStream(dataStream, schema)
+
+  override def fromChangelogStream(
+      dataStream: DataStream[Row],
+      schema: Schema,
+      changelogMode: ChangelogMode): Table =
+    tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+  override def createTemporaryView[T](
+      path: String,
+      dataStream: DataStream[T],
+      schema: Schema): Unit =
+    tableEnv.createTemporaryView[T](path, dataStream, schema)
+
+  override def toDataStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream(table)
+  }
+
+  override def toDataStream[T](table: Table, targetClass: Class[T]): 
DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetClass)
+  }
+
+  override def toDataStream[T](table: Table, targetDataType: 
AbstractDataType[_]): DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetDataType)
+  }
+
+  override def toChangelogStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table)
+  }
+
+  override def toChangelogStream(table: Table, targetSchema: Schema): 
DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema)
+  }
+
+  override def toChangelogStream(
+      table: Table,
+      targetSchema: Schema,
+      changelogMode: ChangelogMode): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+  }
+
+  override def createStatementSet(): StreamStatementSet =
+    tableEnv.createStatementSet()
+
+  override def useModules(strings: String*): Unit =
+    tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: 
TableDescriptor): Unit =
+    tableEnv.createTemporaryTable(path, descriptor)
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit =
+    tableEnv.createTable(path, descriptor)
+
+  override def from(descriptor: TableDescriptor): Table =
+    tableEnv.from(descriptor)
+
+  override def listFullModules(): Array[ModuleEntry] =
+    tableEnv.listFullModules()
+
+  /** @since 1.15 */
+  override def listTables(s: String, s1: String): Array[String] =
+    tableEnv.listTables(s, s1)
+
+  /** @since 1.15 */
+  override def loadPlan(planReference: PlanReference): CompiledPlan =
+    tableEnv.loadPlan(planReference)
+
+  /** @since 1.15 */
+  override def compilePlanSql(s: String): CompiledPlan =
+    tableEnv.compilePlanSql(s)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JavaList[ResourceUri]): Unit =
+    tableEnv.createFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JavaList[ResourceUri],
+      ignoreIfExists: Boolean): Unit =
+    tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+  /** @since 1.17 */
+  override def createTemporaryFunction(
+      path: String,
+      className: String,
+      resourceUris: JavaList[ResourceUri]): Unit =
+    tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createTemporarySystemFunction(
+      name: String,
+      className: String,
+      resourceUris: JavaList[ResourceUri]): Unit =
+    tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+  /** @since 1.17 */
+  override def explainSql(
+      statement: String,
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String =
+    tableEnv.explainSql(statement, format, extraDetails: _*)
+
+  /** @since 1.18 */
+  override def createCatalog(catalog: String, catalogDescriptor: 
CatalogDescriptor): Unit = {
+    tableEnv.createCatalog(catalog, catalogDescriptor)
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..9d295b6b7
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.streampark.common.util.Implicits.JavaList
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api._
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+
+class TableContext(override val parameter: ParameterTool, private val 
tableEnv: TableEnvironment)
+  extends FlinkTableTrait(parameter, tableEnv) {
+
+  def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+  override def useModules(strings: String*): Unit =
+    tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: 
TableDescriptor): Unit = {
+    tableEnv.createTemporaryTable(path, descriptor)
+  }
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+    tableEnv.createTable(path, descriptor)
+  }
+
+  override def from(tableDescriptor: TableDescriptor): Table = {
+    tableEnv.from(tableDescriptor)
+  }
+
+  override def listFullModules(): Array[ModuleEntry] =
+    tableEnv.listFullModules()
+
+  /** @since 1.15 */
+  override def listTables(catalogName: String, databaseName: String): 
Array[String] =
+    tableEnv.listTables(catalogName, databaseName)
+
+  /** @since 1.15 */
+  override def loadPlan(planReference: PlanReference): CompiledPlan =
+    tableEnv.loadPlan(planReference)
+
+  /** @since 1.15 */
+  override def compilePlanSql(stmt: String): CompiledPlan =
+    tableEnv.compilePlanSql(stmt)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JavaList[ResourceUri]): Unit =
+    tableEnv.createFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JavaList[ResourceUri],
+      ignoreIfExists: Boolean): Unit =
+    tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+  /** @since 1.17 */
+  override def createTemporaryFunction(
+      path: String,
+      className: String,
+      resourceUris: JavaList[ResourceUri]): Unit =
+    tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createTemporarySystemFunction(
+      name: String,
+      className: String,
+      resourceUris: JavaList[ResourceUri]): Unit =
+    tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+  /** @since 1.17 */
+  override def explainSql(
+      statement: String,
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String =
+    tableEnv.explainSql(statement, format, extraDetails: _*)
+
+  /** @since 1.18 */
+  override def createCatalog(catalog: String, catalogDescriptor: 
CatalogDescriptor): Unit = {
+    tableEnv.createCatalog(catalog, catalogDescriptor)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
new file mode 100644
index 000000000..1e36742e2
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.table.api.bridge.scala.{TableConversions => 
FlinkTableConversions}
+import org.apache.flink.types.Row
+
+object TableExt {
+
+  class Table(val table: FlinkTable) {
+    def ->(field: String, fields: String*): FlinkTable =
+      table.as(field, fields: _*)
+  }
+
+  class TableConversions(table: FlinkTable) extends 
FlinkTableConversions(table) {
+
+    def \\ : DataStream[Row] = toDataStream
+
+    def >>[T: TypeInformation](implicit context: StreamTableContext): 
DataStream[T] = {
+      context.isConvertedToDataStream = true
+      super.toAppendStream
+    }
+  }
+
+}
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index a5e32e9a5..9d4e93b54 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -71,28 +71,32 @@ enumeratum-macros_2.12-1.6.1.jar
 enumeratum_2.12-1.6.1.jar
 failureaccess-1.0.1.jar
 flatbuffers-java-1.9.0.jar
-flink-annotations-1.18.1.jar
+flink-annotations-1.20.1.jar
 flink-cdc-cli-3.2.1.jar
 flink-cdc-common-3.2.1.jar
 flink-cdc-composer-3.2.1.jar
 flink-cdc-runtime-3.2.1.jar
-flink-clients-1.18.1.jar
-flink-core-1.18.1.jar
-flink-hadoop-fs-1.18.1.jar
-flink-java-1.18.1.jar
-flink-kubernetes-1.18.1.jar
-flink-metrics-core-1.18.1.jar
-flink-optimizer-1.18.1.jar
-flink-rpc-akka-loader-1.18.1.jar
-flink-rpc-core-1.18.1.jar
-flink-runtime-1.18.1.jar
+flink-clients-1.20.1.jar
+flink-connector-datagen-1.20.1.jar
+flink-core-1.20.1.jar
+flink-core-api-1.20.1.jar
+flink-datastream-1.20.1.jar
+flink-datastream-api-1.20.1.jar
+flink-hadoop-fs-1.20.1.jar
+flink-java-1.20.1.jar
+flink-kubernetes-1.20.1.jar
+flink-metrics-core-1.20.1.jar
+flink-optimizer-1.20.1.jar
+flink-rpc-akka-loader-1.20.1.jar
+flink-rpc-core-1.20.1.jar
+flink-runtime-1.20.1.jar
 flink-shaded-asm-9-9.5-17.0.jar
 flink-shaded-guava-31.1-jre-17.0.jar
 flink-shaded-jackson-2.14.2-17.0.jar
 flink-shaded-netty-4.1.91.Final-17.0.jar
-flink-streaming-java-1.18.1.jar
-flink-table-common-1.18.1.jar
-flink-table-api-java-1.18.1.jar
+flink-streaming-java-1.20.1.jar
+flink-table-api-java-1.20.1.jar
+flink-table-common-1.20.1.jar
 freemarker-2.3.30.jar
 gson-2.9.1.jar
 guava-30.0-jre.jar
@@ -391,4 +395,5 @@ xz-1.5.jar
 zookeeper-3.6.3.jar
 icu4j-67.1.jar
 zookeeper-jute-3.6.3.jar
+async-profiler-2.9.jar
 netty-transport-native-epoll-4.1.91.Final.jar

Reply via email to