This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch 1.20
in repository https://gitbox.apache.org/repos/asf/streampark.git

commit dc9966f978f2d95d0676c71957cea03dc69e7e69
Author: benjobs <[email protected]>
AuthorDate: Mon Jun 23 13:40:45 2025 +0800

    [e2e] e2e flink-version update to 1.20.1
---
 .../impl/FlinkApplicationInfoServiceImpl.java      |   8 +-
 .../core/service/FlinkSavepointServiceTest.java    |  11 +-
 .../cases/Flink120OnRemoteClusterDeployTest.java   | 104 ++++++
 .../e2e/cases/Flink120OnYarnClusterDeployTest.java | 152 +++++++++
 .../e2e/cases/FlinkSQL120OnYarnTest.java           | 371 +++++++++++++++++++++
 .../docker/flink-1.20-on-remote/Dockerfile         |  18 +
 .../flink-1.20-on-remote/docker-compose.yaml       |  90 +++++
 .../resources/docker/flink-1.20-on-yarn/Dockerfile |  45 +++
 .../flink-1.20-on-yarn/docker-compose.config       |  44 +++
 .../docker/flink-1.20-on-yarn/docker-compose.yaml  | 163 +++++++++
 10 files changed, 998 insertions(+), 8 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
index 7276bcde1..8ced52f6a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
@@ -484,21 +484,21 @@ public class FlinkApplicationInfoServiceImpl extends 
ServiceImpl<FlinkApplicatio
             final String pathPart = uri.getPath();
             String error = null;
             if (scheme == null) {
-                error = "This state.savepoints.dir value "
+                error = "This state-savepoints-dir value "
                     + savepointPath
                     + " scheme (hdfs://, file://, etc) of  is null. Please 
specify the file system scheme explicitly in the URI.";
             } else if (pathPart == null) {
-                error = "This state.savepoints.dir value "
+                error = "This state-savepoints-dir value "
                     + savepointPath
                     + " path part to store the checkpoint data in is null. 
Please specify a directory path for the checkpoint data.";
             } else if (pathPart.isEmpty() || "/".equals(pathPart)) {
-                error = "This state.savepoints.dir value "
+                error = "This state-savepoints-dir value "
                     + savepointPath
                     + " Cannot use the root directory for checkpoints.";
             }
             return error;
         } else {
-            return "When custom savepoint is not set, state.savepoints.dir 
needs to be set in properties or flink-conf.yaml of application";
+            return "When custom savepoint is not set, state-savepoints-dir 
needs to be set in properties or flink config of application";
         }
     }
 }
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
index 14cbc3fdb..2e27d983e 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
@@ -82,8 +82,8 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase {
      */
     @Test
     void testGetSavepointFromDynamicProps() {
-        String propsWithEmptyTargetValue = "-Dstate.savepoints.dir=";
-        String props = "-Dstate.savepoints.dir=hdfs:///test";
+        String propsWithEmptyTargetValue = "-Dexecution.checkpointing.dir=";
+        String props = "-Dexecution.checkpointing.dir=hdfs:///test";
         FlinkSavepointServiceImpl savepointServiceImpl = 
(FlinkSavepointServiceImpl) savepointService;
 
         
assertThat(savepointServiceImpl.getSavepointFromDynamicProps(null)).isNull();
@@ -114,11 +114,13 @@ class FlinkSavepointServiceTest extends 
SpringUnitTestBase {
         app.setJobType(FlinkJobType.FLINK_JAR.getMode());
         assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
 
+        String ckDir = "execution.checkpointing.dir=hdfs:///test";
+
         // Test for (StreamPark job Or FlinkSQL job) with application config 
just disabled checkpoint.
         FlinkApplicationConfig appCfg = new FlinkApplicationConfig();
         appCfg.setId(appCfgId);
         appCfg.setAppId(appId);
-        appCfg.setContent("state.savepoints.dir=hdfs:///test");
+        appCfg.setContent(ckDir);
         appCfg.setFormat(ConfigFileTypeEnum.PROPERTIES.getValue());
         configService.save(appCfg);
         assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
@@ -134,9 +136,10 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase 
{
         // Test for configured CHECKPOINTING_INTERVAL
         appCfg.setContent(
             DeflaterUtils.zipString(
-                "state.savepoints.dir=hdfs:///test\n"
+                ckDir + "\n"
                     + String.format("%s=%s", CHECKPOINTING_INTERVAL.key(),
                         "3min")));
+
         configService.updateById(appCfg);
         FlinkEffective effective = new FlinkEffective();
         effective.setTargetId(appCfg.getId());
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
new file mode 100644
index 000000000..2e5236768
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.RemoteForm;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-remote/docker-compose.yaml")
+public class Flink118OnRemoteClusterDeployTest {
+
+    public static RemoteWebDriver browser;
+
+    private static final String flinkName = "flink-1.20.1";
+
+    private static final String flinkHome = "/opt/flink/";
+
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+    private static final String flinkJobManagerUrl = "http://jobmanager:8081";;
+
+    private static final ClusterDetailForm.DeployMode deployMode = 
ClusterDetailForm.DeployMode.STANDALONE;
+
+    @BeforeAll
+    public static void setUp() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login()
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
+    }
+
+    @Test
+    @Order(1)
+    public void testCreateFlinkCluster() {
+        FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);
+
+        flinkClustersPage.createFlinkCluster()
+            .<RemoteForm>addCluster(deployMode)
+            .jobManagerURL(flinkJobManagerUrl)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(flinkClusterName)));
+    }
+
+    @Test
+    @Order(5)
+    public void testDeleteFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.deleteFlinkCluster(flinkClusterName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+                    Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+                    assertThat(flinkClustersPage.flinkClusterList)
+                        .noneMatch(it -> 
it.getText().contains(flinkClusterName));
+                });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
new file mode 100644
index 000000000..c1750746b
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-yarn/docker-compose.yaml")
+public class Flink118OnYarnClusterDeployTest {
+
+    public static RemoteWebDriver browser;
+
+    private static final String flinkName = "flink-1.20.1";
+
+    private static final String flinkHome = "/flink-1.20.1";
+
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+    private static final String flinkClusterNameEdited = 
"flink_1.20.1_cluster_e2e_edited";
+
+    private static final ClusterDetailForm.DeployMode deployMode = 
ClusterDetailForm.DeployMode.YARN_SESSION;
+
+    @BeforeAll
+    public static void setup() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login()
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
+    }
+
+    @Test
+    @Order(1)
+    public void testCreateFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.createFlinkCluster()
+            .<YarnSessionForm>addCluster(deployMode)
+            .resolveOrder(YarnSessionForm.ResolveOrder.CHILD_FIRST)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(flinkClusterName)));
+    }
+
+    @Test
+    @Order(2)
+    public void testEditFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.editFlinkCluster(flinkClusterName)
+            .<YarnSessionForm>addCluster(deployMode)
+            .clusterName(flinkClusterNameEdited)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain edited 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(flinkClusterNameEdited)));
+    }
+
+    @Test
+    @Order(3)
+    public void testStartFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.startFlinkCluster(flinkClusterNameEdited);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain running 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+    }
+
+    @Test
+    @Order(4)
+    public void testStopFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.stopFlinkCluster(flinkClusterNameEdited);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(flinkClustersPage.flinkClusterList)
+                    .as("Flink clusters list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(5)
+    public void testDeleteFlinkCluster() {
+        final FlinkClustersPage flinkClustersPage = new 
FlinkClustersPage(browser);
+
+        flinkClustersPage.deleteFlinkCluster(flinkClusterNameEdited);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+                    Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+                    assertThat(flinkClustersPage.flinkClusterList)
+                        .noneMatch(it -> 
it.getText().contains(flinkClusterNameEdited));
+                });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
new file mode 100644
index 000000000..e8b28fb99
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
+import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
+
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-yarn/docker-compose.yaml")
+public class FlinkSQL118OnYarnTest {
+
+    public static RemoteWebDriver browser;
+
+    private static final String flinkName = "flink-1.20.1";
+
+    private static final String flinkHome = "/flink-1.20.1";
+
+    private static final String applicationName = "flink-120-e2e-test";
+
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+    @BeforeAll
+    public static void setup() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login()
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        FlinkClustersPage flinkClustersPage = 
flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
+
+        flinkClustersPage.createFlinkCluster()
+            
.<YarnSessionForm>addCluster(ClusterDetailForm.DeployMode.YARN_SESSION)
+            .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        flinkClustersPage.startFlinkCluster(flinkClusterName);
+
+        flinkClustersPage.goToNav(ApacheFlinkPage.class)
+            .goToTab(ApplicationsPage.class);
+    }
+
+    @Test
+    @Order(1)
+    void testCreateFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage
+            .createApplication()
+            .addApplication(
+                ApplicationForm.FlinkJobType.FLINK_SQL,
+                ApplicationForm.DeployMode.YARN_APPLICATION,
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(2)
+    void testReleaseFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain released 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(3)
+    void testStartFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain finished 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(4)
+    @SneakyThrows
+    void testCancelFlinkApplicationOnYarnApplicationMode() {
+        Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain restarted 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        applicationsPage.cancelApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(5)
+    void testDeleteFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+
+                    assertThat(applicationsPage.applicationsList)
+                        .noneMatch(it -> 
it.getText().contains(applicationName));
+                });
+    }
+
+    @Test
+    @Order(6)
+    void testCreateFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage
+            .createApplication()
+            .addApplication(
+                ApplicationForm.FlinkJobType.FLINK_SQL,
+                ApplicationForm.DeployMode.YARN_PER_JOB,
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(7)
+    void testReleaseFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain released 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(8)
+    void testStartFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain started application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain finished 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(9)
+    @SneakyThrows
+    void testRestartAndCancelFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain restarted 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        applicationsPage.cancelApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(10)
+    void testDeleteFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+
+                    assertThat(applicationsPage.applicationsList)
+                        .noneMatch(it -> 
it.getText().contains(applicationName));
+                });
+    }
+
+    @Test
+    @Order(11)
+    void testCreateFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage
+            .createApplication()
+            .addApplication(
+                ApplicationForm.FlinkJobType.FLINK_SQL,
+                ApplicationForm.DeployMode.YARN_SESSION,
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .flinkCluster(flinkClusterName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(12)
+    void testReleaseFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain released 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(13)
+    void testStartFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain finished 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(14)
+    @SneakyThrows
+    void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain restarted 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        applicationsPage.cancelApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList)
+                    .as("Applications list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(15)
+    void testDeleteFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+        applicationsPage.deleteApplication(applicationName);
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+
+                    assertThat(applicationsPage.applicationsList)
+                        .noneMatch(it -> 
it.getText().contains(applicationName));
+                });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
new file mode 100644
index 000000000..b1e093fd4
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
new file mode 100644
index 000000000..b16486e6b
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+services:
+  jobmanager:
+    image: flink:1.20.1
+    command: jobmanager
+    ports:
+      - "8081:8081"
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: jobmanager
+    networks:
+      - e2e
+    volumes:
+      - flink_data:/opt/flink
+      - /var/run/docker.sock:/var/run/docker.sock
+    healthcheck:
+      test: [ "CMD", "curl", "http://localhost:8081"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+  taskmanager:
+    image: flink:1.20.1
+    depends_on:
+      - jobmanager
+    command: taskmanager
+    scale: 1
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: jobmanager
+        taskmanager.numberOfTaskSlots: 2
+    networks:
+      - e2e
+    volumes:
+      - flink_data:/opt/flink
+      - /var/run/docker.sock:/var/run/docker.sock
+    healthcheck:
+      test: [ "CMD", "curl", "http://localhost:8081"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+  streampark:
+    image: apache/streampark:ci
+    command: bash bin/streampark.sh start_docker
+    build:
+      context: ./
+      dockerfile: ./Dockerfile
+    ports:
+      - 10000:10000
+      - 10030:10030
+    environment:
+      - SPRING_PROFILES_ACTIVE=h2
+      - TZ=Asia/Shanghai
+      - FLINK_JOBMANAGER_URL=http://jobmanager:8081
+    privileged: true
+    restart: unless-stopped
+    networks:
+      - e2e
+    volumes:
+      - flink_data:/opt/flink
+      - ${HOME}/streampark_build_logs:/tmp/streampark/logs/build_logs/
+      - /var/run/docker.sock:/var/run/docker.sock
+    healthcheck:
+      test: [ "CMD", "curl", "http://localhost:10000"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+networks:
+  e2e:
+volumes:
+  flink_data:
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
new file mode 100644
index 000000000..6b050d220
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+FROM flink:1.20.1-scala_2.12-java8 as flink-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+    && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g" 
/streampark/conf/config.yaml
+
+# Install Flink
+COPY --from=flink-image /opt/flink /flink-1.20.1
+RUN sudo chown -R hadoop.hadoop /flink-1.20.1
+
+# Install javac
+ARG TARGETPLATFORM
+RUN echo "TARGETPLATFORM: $TARGETPLATFORM"
+RUN \
+    if [ "$TARGETPLATFORM" = "linux/arm64" ];then \
+        sudo rm -f /etc/yum.repos.d/*.repo; \
+        sudo wget http://mirrors.aliyun.com/repo/Centos-altarch-7.repo -O 
/etc/yum.repos.d/CentOS-Base.repo; \
+        sudo sed -i "s/http:\/\//https:\/\//g" 
/etc/yum.repos.d/CentOS-Base.repo; \
+        sudo yum install -y java-1.8.0-openjdk-devel; \
+    elif [ "$TARGETPLATFORM" = "linux/amd64" ];then \
+        sudo yum install -y java-1.8.0-openjdk-devel; \
+    else \
+        echo "unknown TARGETPLATFORM: $TARGETPLATFORM"; \
+        exit 2;  \
+    fi
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
new file mode 100644
index 000000000..441fa1118
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+YARN-SITE.XML_yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage=99.9
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..73af8308b
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+services:
+  namenode:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: namenode
+    command: [ "hdfs", "namenode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "19870:9870"
+    env_file:
+      - docker-compose.config
+    environment:
+      ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://namenode:9870"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  datanode:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: datanode
+    command: [ "hdfs", "datanode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://datanode:9864"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+    depends_on:
+      namenode:
+        condition: service_healthy
+  resourcemanager:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: resourcemanager
+    command: [ "yarn", "resourcemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "18088:8088"
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://resourcemanager:8088"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  nodemanager:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: nodemanager
+    command: [ "yarn", "nodemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    depends_on:
+      resourcemanager:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "curl", "http://nodemanager:8042"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  streampark:
+    image: apache/streampark-flink-1.20.1-on-yarn:ci
+    hostname: streampark
+    command:
+      - sh
+      - -c
+      - /streampark/bin/streampark.sh start_docker
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "20000:10000"
+    environment:
+      - SPRING_PROFILES_ACTIVE=h2
+      - TZ=Asia/Shanghai
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    depends_on:
+      namenode:
+        condition: service_healthy
+      datanode:
+        condition: service_healthy
+      resourcemanager:
+        condition: service_healthy
+      nodemanager:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "curl", "http://streampark:10000"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+networks:
+  e2e:


Reply via email to