This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 56d872412 [Feature-3754] Add Flink on Yarn per job mode and Flink on 
Yarn yarn application test cases (#3760)
56d872412 is described below

commit 56d8724128dd624447368b3a4191842cfebcd655
Author: xiangzihao <[email protected]>
AuthorDate: Sun Jun 16 11:27:01 2024 +0800

    [Feature-3754] Add Flink on Yarn per job mode and Flink on Yarn yarn 
application test cases (#3760)
    
    * add feature 3754
---
 .github/workflows/e2e.yml                          |  10 +-
 .../e2e/cases/ApplicationsFlink116OnYarnTest.java  | 258 +++++++++++++++++++++
 .../e2e/cases/ApplicationsFlink117OnYarnTest.java  | 258 +++++++++++++++++++++
 .../e2e/cases/ApplicationsFlink118OnYarnTest.java  | 258 +++++++++++++++++++++
 .../apache/streampark/e2e/cases/FlinkHomeTest.java | 101 ++++++++
 .../e2e/pages/apacheflink/ApacheFlinkPage.java     |  68 ++++++
 .../e2e/pages/apacheflink/FlinkHomePage.java       | 137 +++++++++++
 .../apacheflink/applications/ApplicationForm.java  | 204 ++++++++++++++++
 .../apacheflink/applications/ApplicationsPage.java | 180 ++++++++++++++
 .../apacheflink/applications/FlinkSQLEditor.java   |  55 +++++
 .../applications/FlinkSQLYarnApplicationForm.java  |  68 ++++++
 .../entity/ApplicationsDynamicParams.java          |  27 +++
 .../streampark/e2e/pages/common/NavBarPage.java    |  18 +-
 .../resources/docker/flink-1.16-on-yarn/Dockerfile |  32 +++
 .../flink-1.16-on-yarn/docker-compose.config       |  42 ++++
 .../docker/flink-1.16-on-yarn/docker-compose.yaml  | 156 +++++++++++++
 .../resources/docker/flink-1.17-on-yarn/Dockerfile |  32 +++
 .../flink-1.17-on-yarn/docker-compose.config       |  42 ++++
 .../docker/flink-1.17-on-yarn/docker-compose.yaml  | 156 +++++++++++++
 .../resources/docker/flink-1.18-on-yarn/Dockerfile |  32 +++
 .../flink-1.18-on-yarn/docker-compose.config       |  42 ++++
 .../docker/flink-1.18-on-yarn/docker-compose.yaml  | 156 +++++++++++++
 .../streampark/e2e/core/StreamParkExtension.java   |   2 +-
 23 files changed, 2329 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index bb7395543..7601cdc1b 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -97,16 +97,22 @@ jobs:
             class: org.apache.streampark.e2e.cases.UserManagementTest
           - name: TeamManagementTest
             class: org.apache.streampark.e2e.cases.TeamManagementTest
+          - name: ApplicationsFlink116OnYarnTest
+            class: 
org.apache.streampark.e2e.cases.ApplicationsFlink116OnYarnTest
+          - name: ApplicationsFlink117OnYarnTest
+            class: 
org.apache.streampark.e2e.cases.ApplicationsFlink117OnYarnTest
+          - name: ApplicationsFlink118OnYarnTest
+            class: 
org.apache.streampark.e2e.cases.ApplicationsFlink118OnYarnTest
     env:
       RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
     steps:
       - uses: actions/checkout@v4
         with:
           submodules: true
-      - name: Set up JDK 8
+      - name: Set up JDK 11
         uses: actions/setup-java@v4
         with:
-          java-version: 8
+          java-version: 11
           distribution: 'adopt'
       - name: Cache local Maven repository
         uses: actions/cache@v4
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnTest.java
new file mode 100644
index 000000000..ea63b2096
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.cases;
+
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.apacheflink.FlinkHomePage;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationForm;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationsPage;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.entity.ApplicationsDynamicParams;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml")
+public class ApplicationsFlink116OnYarnTest {
+    private static RemoteWebDriver browser;
+
+    private static final String userName = "admin";
+
+    private static final String password = "streampark";
+
+    private static final String teamName = "default";
+
+    private static final String flinkName = "flink-1.16.3";
+
+    private static final String flinkHome = "/flink-1.16.3";
+
+    private static final String applicationName = "flink-116-e2e-test";
+
+    @BeforeAll
+    public static void setup() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login(userName, password, teamName)
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+
+        flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(ApplicationsPage.class);
+    }
+
+    @Test
+    @Order(10)
+    void testCreateFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
+        String flinkSQL = "CREATE TABLE datagen (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING,\n" +
+            "ts AS localtimestamp,\n" +
+            "WATERMARK FOR ts AS ts\n" +
+            ") WITH (\n" +
+            "'connector' = 'datagen',\n" +
+            "'rows-per-second'='5',\n" +
+            "'fields.f_sequence.kind'='sequence',\n" +
+            "'fields.f_sequence.start'='1',\n" +
+            "'fields.f_sequence.end'='100',\n" +
+            "'fields.f_random.min'='1',\n" +
+            "'fields.f_random.max'='100',\n" +
+            "'fields.f_random_str.length'='10'\n" +
+            ");\n" +
+            "\n" +
+            "CREATE TABLE print_table (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING\n" +
+            ") WITH (\n" +
+            "'connector' = 'print'\n" +
+            ");\n" +
+            "\n" +
+            "INSERT INTO print_table select f_sequence,f_random,f_random_str 
from datagen;";
+        applicationsDynamicParams.flinkSQL(flinkSQL);
+        
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+            ApplicationForm.ExecutionMode.YARN_APPLICATION,
+            applicationName,
+            flinkName,
+            applicationsDynamicParams);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain newly-created application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(20)
+    void testReleaseFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain released application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(30)
+    void testStartFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain started application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("RUNNING")));
+
+        Awaitility.await()
+            .untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain finished application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(40)
+    void testDeleteFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> {
+            browser.navigate().refresh();
+
+            assertThat(
+                applicationsPage.applicationsList()
+            ).noneMatch(
+                it -> it.getText().contains(applicationName)
+            );
+        });
+    }
+
+    @Test
+    @Order(50)
+    void testCreateFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
+        String flinkSQL = "CREATE TABLE datagen (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING,\n" +
+            "ts AS localtimestamp,\n" +
+            "WATERMARK FOR ts AS ts\n" +
+            ") WITH (\n" +
+            "'connector' = 'datagen',\n" +
+            "'rows-per-second'='5',\n" +
+            "'fields.f_sequence.kind'='sequence',\n" +
+            "'fields.f_sequence.start'='1',\n" +
+            "'fields.f_sequence.end'='100',\n" +
+            "'fields.f_random.min'='1',\n" +
+            "'fields.f_random.max'='100',\n" +
+            "'fields.f_random_str.length'='10'\n" +
+            ");\n" +
+            "\n" +
+            "CREATE TABLE print_table (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING\n" +
+            ") WITH (\n" +
+            "'connector' = 'print'\n" +
+            ");\n" +
+            "\n" +
+            "INSERT INTO print_table select f_sequence,f_random,f_random_str 
from datagen;";
+        applicationsDynamicParams.flinkSQL(flinkSQL);
+        
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+            ApplicationForm.ExecutionMode.YARN_PER_JOB,
+            applicationName,
+            flinkName,
+            applicationsDynamicParams);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain newly-created application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(60)
+    void testReleaseFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain released application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+//    This test cannot be executed due to a bug, and will be put online after 
issue #3761 fixed
+//    @Test
+//    @Order(70)
+//    void testStartFlinkApplicationOnYarnPerJobMode() {
+//        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+//
+//        applicationsPage.startApplication(applicationName);
+//
+//        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+//            .as("Applications list should contain started application")
+//            .extracting(WebElement::getText)
+//            .anyMatch(it -> it.contains("RUNNING")));
+//
+//        Awaitility.await()
+//            .untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+//                .as("Applications list should contain finished application")
+//                .extracting(WebElement::getText)
+//                .anyMatch(it -> it.contains("FINISHED")));
+//    }
+
+    @Test
+    @Order(80)
+    void testDeleteFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> {
+            browser.navigate().refresh();
+
+            assertThat(
+                applicationsPage.applicationsList()
+            ).noneMatch(
+                it -> it.getText().contains(applicationName)
+            );
+        });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnTest.java
new file mode 100644
index 000000000..ef9905ab5
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.cases;
+
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.apacheflink.FlinkHomePage;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationForm;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationsPage;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.entity.ApplicationsDynamicParams;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml")
+public class ApplicationsFlink117OnYarnTest {
+    private static RemoteWebDriver browser;
+
+    private static final String userName = "admin";
+
+    private static final String password = "streampark";
+
+    private static final String teamName = "default";
+
+    private static final String flinkName = "flink-1.17.2";
+
+    private static final String flinkHome = "/flink-1.17.2";
+
+    private static final String applicationName = "flink-117-e2e-test";
+
+    @BeforeAll
+    public static void setup() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login(userName, password, teamName)
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+
+        flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(ApplicationsPage.class);
+    }
+
+    @Test
+    @Order(10)
+    void testCreateFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
+        String flinkSQL = "CREATE TABLE datagen (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING,\n" +
+            "ts AS localtimestamp,\n" +
+            "WATERMARK FOR ts AS ts\n" +
+            ") WITH (\n" +
+            "'connector' = 'datagen',\n" +
+            "'rows-per-second'='5',\n" +
+            "'fields.f_sequence.kind'='sequence',\n" +
+            "'fields.f_sequence.start'='1',\n" +
+            "'fields.f_sequence.end'='100',\n" +
+            "'fields.f_random.min'='1',\n" +
+            "'fields.f_random.max'='100',\n" +
+            "'fields.f_random_str.length'='10'\n" +
+            ");\n" +
+            "\n" +
+            "CREATE TABLE print_table (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING\n" +
+            ") WITH (\n" +
+            "'connector' = 'print'\n" +
+            ");\n" +
+            "\n" +
+            "INSERT INTO print_table select f_sequence,f_random,f_random_str 
from datagen;";
+        applicationsDynamicParams.flinkSQL(flinkSQL);
+        
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+            ApplicationForm.ExecutionMode.YARN_APPLICATION,
+            applicationName,
+            flinkName,
+            applicationsDynamicParams);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain newly-created application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(20)
+    void testReleaseFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain released application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(30)
+    void testStartFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain started application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("RUNNING")));
+
+        Awaitility.await()
+            .untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain finished application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(40)
+    void testDeleteFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> {
+            browser.navigate().refresh();
+
+            assertThat(
+                applicationsPage.applicationsList()
+            ).noneMatch(
+                it -> it.getText().contains(applicationName)
+            );
+        });
+    }
+
+    @Test
+    @Order(50)
+    void testCreateFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
+        String flinkSQL = "CREATE TABLE datagen (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING,\n" +
+            "ts AS localtimestamp,\n" +
+            "WATERMARK FOR ts AS ts\n" +
+            ") WITH (\n" +
+            "'connector' = 'datagen',\n" +
+            "'rows-per-second'='5',\n" +
+            "'fields.f_sequence.kind'='sequence',\n" +
+            "'fields.f_sequence.start'='1',\n" +
+            "'fields.f_sequence.end'='100',\n" +
+            "'fields.f_random.min'='1',\n" +
+            "'fields.f_random.max'='100',\n" +
+            "'fields.f_random_str.length'='10'\n" +
+            ");\n" +
+            "\n" +
+            "CREATE TABLE print_table (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING\n" +
+            ") WITH (\n" +
+            "'connector' = 'print'\n" +
+            ");\n" +
+            "\n" +
+            "INSERT INTO print_table select f_sequence,f_random,f_random_str 
from datagen;";
+        applicationsDynamicParams.flinkSQL(flinkSQL);
+        
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+            ApplicationForm.ExecutionMode.YARN_PER_JOB,
+            applicationName,
+            flinkName,
+            applicationsDynamicParams);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain newly-created application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(60)
+    void testReleaseFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain released application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+//    This test cannot be executed due to a bug, and will be put online after 
issue #3761 fixed
+//    @Test
+//    @Order(70)
+//    void testStartFlinkApplicationOnYarnPerJobMode() {
+//        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+//
+//        applicationsPage.startApplication(applicationName);
+//
+//        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+//            .as("Applications list should contain started application")
+//            .extracting(WebElement::getText)
+//            .anyMatch(it -> it.contains("RUNNING")));
+//
+//        Awaitility.await()
+//            .untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+//                .as("Applications list should contain finished application")
+//                .extracting(WebElement::getText)
+//                .anyMatch(it -> it.contains("FINISHED")));
+//    }
+
+    @Test
+    @Order(80)
+    void testDeleteFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> {
+            browser.navigate().refresh();
+
+            assertThat(
+                applicationsPage.applicationsList()
+            ).noneMatch(
+                it -> it.getText().contains(applicationName)
+            );
+        });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnTest.java
new file mode 100644
index 000000000..ebd3bd6e7
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.cases;
+
+
+import lombok.SneakyThrows;
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.apacheflink.FlinkHomePage;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationForm;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationsPage;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.entity.ApplicationsDynamicParams;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
+public class ApplicationsFlink118OnYarnTest {
+    private static RemoteWebDriver browser;
+
+    private static final String userName = "admin";
+
+    private static final String password = "streampark";
+
+    private static final String teamName = "default";
+
+    private static final String flinkName = "flink-1.18.1";
+
+    private static final String flinkHome = "/flink-1.18.1";
+
+    private static final String applicationName = "flink-118-e2e-test";
+
+    @BeforeAll
+    public static void setup() {
+        FlinkHomePage flinkHomePage = new LoginPage(browser)
+            .login(userName, password, teamName)
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+
+        flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(ApplicationsPage.class);
+    }
+
+    @Test
+    @Order(10)
+    void testCreateFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
+        String flinkSQL = "CREATE TABLE datagen (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING,\n" +
+            "ts AS localtimestamp,\n" +
+            "WATERMARK FOR ts AS ts\n" +
+            ") WITH (\n" +
+            "'connector' = 'datagen',\n" +
+            "'rows-per-second'='5',\n" +
+            "'fields.f_sequence.kind'='sequence',\n" +
+            "'fields.f_sequence.start'='1',\n" +
+            "'fields.f_sequence.end'='100',\n" +
+            "'fields.f_random.min'='1',\n" +
+            "'fields.f_random.max'='100',\n" +
+            "'fields.f_random_str.length'='10'\n" +
+            ");\n" +
+            "\n" +
+            "CREATE TABLE print_table (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING\n" +
+            ") WITH (\n" +
+            "'connector' = 'print'\n" +
+            ");\n" +
+            "\n" +
+            "INSERT INTO print_table select f_sequence,f_random,f_random_str 
from datagen;";
+        applicationsDynamicParams.flinkSQL(flinkSQL);
+        
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+            ApplicationForm.ExecutionMode.YARN_APPLICATION,
+            applicationName,
+            flinkName,
+            applicationsDynamicParams);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain newly-created application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(20)
+    void testReleaseFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain released application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(30)
+    void testStartFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain started application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("RUNNING")));
+
+        Awaitility.await()
+            .untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain finished application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(40)
+    void testDeleteFlinkApplicationOnYarnApplicationMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> {
+            browser.navigate().refresh();
+
+            assertThat(
+                applicationsPage.applicationsList()
+            ).noneMatch(
+                it -> it.getText().contains(applicationName)
+            );
+        });
+    }
+
+    @Test
+    @Order(50)
+    void testCreateFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
+        String flinkSQL = "CREATE TABLE datagen (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING,\n" +
+            "ts AS localtimestamp,\n" +
+            "WATERMARK FOR ts AS ts\n" +
+            ") WITH (\n" +
+            "'connector' = 'datagen',\n" +
+            "'rows-per-second'='5',\n" +
+            "'fields.f_sequence.kind'='sequence',\n" +
+            "'fields.f_sequence.start'='1',\n" +
+            "'fields.f_sequence.end'='100',\n" +
+            "'fields.f_random.min'='1',\n" +
+            "'fields.f_random.max'='100',\n" +
+            "'fields.f_random_str.length'='10'\n" +
+            ");\n" +
+            "\n" +
+            "CREATE TABLE print_table (\n" +
+            "f_sequence INT,\n" +
+            "f_random INT,\n" +
+            "f_random_str STRING\n" +
+            ") WITH (\n" +
+            "'connector' = 'print'\n" +
+            ");\n" +
+            "\n" +
+            "INSERT INTO print_table select f_sequence,f_random,f_random_str 
from datagen;";
+        applicationsDynamicParams.flinkSQL(flinkSQL);
+        
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+            ApplicationForm.ExecutionMode.YARN_PER_JOB,
+            applicationName,
+            flinkName,
+            applicationsDynamicParams);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain newly-created application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(60)
+    void testReleaseFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain released application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(70)
+    void testStartFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+            .as("Applications list should contain started application")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains("RUNNING")));
+
+        Awaitility.await()
+            .untilAsserted(() -> 
assertThat(applicationsPage.applicationsList())
+                .as("Applications list should contain finished application")
+                .extracting(WebElement::getText)
+                .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(80)
+    void testDeleteFlinkApplicationOnYarnPerJobMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await().untilAsserted(() -> {
+            browser.navigate().refresh();
+
+            assertThat(
+                applicationsPage.applicationsList()
+            ).noneMatch(
+                it -> it.getText().contains(applicationName)
+            );
+        });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkHomeTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkHomeTest.java
new file mode 100644
index 000000000..6f9b09097
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkHomeTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.apacheflink.FlinkHomePage;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
+public class FlinkHomeTest {
+    private static RemoteWebDriver browser;
+
+    private static final String userName = "admin";
+
+    private static final String password = "streampark";
+
+    private static final String teamName = "default";
+
+    private static final String flinkName = "flink-1.18.1";
+
+    private static final String flinkHome = "/flink-1.18.1";
+
+    private static final String flinkDescription = "description test";
+
+    private static final String newFlinkHome = "flink_1.18.1";
+
+    @BeforeAll
+    public static void setup() {
+        new LoginPage(browser)
+            .login(userName, password, teamName)
+            .goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkHomePage.class);
+    }
+
+    @Test
+    @Order(10)
+    void testCreateFlinkHome() {
+        final FlinkHomePage flinkHomePage = new FlinkHomePage(browser);
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(flinkHomePage.flinkHomeList())
+            .as("Flink Home list should contain newly-created flink home")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains(flinkName)));
+    }
+
+    @Test
+    @Order(20)
+    void testEditFlinkHome() {
+        final FlinkHomePage flinkHomePage = new FlinkHomePage(browser);
+        flinkHomePage.editFlinkHome(flinkName, newFlinkHome);
+
+        Awaitility.await().untilAsserted(() -> 
assertThat(flinkHomePage.flinkHomeList())
+            .as("Flink Home list should contain edited flink home")
+            .extracting(WebElement::getText)
+            .anyMatch(it -> it.contains(newFlinkHome)));
+    }
+
+    @Test
+    @Order(30)
+    void testDeleteFlinkHome() {
+        final FlinkHomePage flinkHomePage = new FlinkHomePage(browser);
+        flinkHomePage.deleteFlinkHome(newFlinkHome);
+
+        Awaitility.await().untilAsserted(() -> {
+            browser.navigate().refresh();
+
+            assertThat(
+                flinkHomePage.flinkHomeList()
+            ).noneMatch(
+                it -> it.getText().contains(newFlinkHome)
+            );
+        });
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/ApacheFlinkPage.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/ApacheFlinkPage.java
new file mode 100644
index 000000000..0c96dda3e
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/ApacheFlinkPage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink;
+
+import lombok.Getter;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationsPage;
+import org.apache.streampark.e2e.pages.common.NavBarPage;
+import org.apache.streampark.e2e.pages.common.NavBarPage.NavBarItem;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+
+@Getter
+public final class ApacheFlinkPage extends NavBarPage implements NavBarItem {
+    @FindBy(xpath = "//span[contains(@class, 
'streampark-simple-menu-sub-title') and contains(text(), 'Applications')]//..")
+    private WebElement menuApplications;
+
+    @FindBy(xpath = "//span[contains(@class, 
'streampark-simple-menu-sub-title') and contains(text(), 'Flink Home')]//..")
+    private WebElement menuFlinkHome;
+
+    @FindBy(xpath = "//span[contains(@class, 
'streampark-simple-menu-sub-title') and contains(text(), 'Clusters')]//..")
+    private WebElement menuClusters;
+
+
+    public ApacheFlinkPage(RemoteWebDriver driver) {
+        super(driver);
+    }
+
+    public <T extends ApacheFlinkPage.Tab> T goToTab(Class<T> tab) {
+        if (tab == ApplicationsPage.class) {
+            new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(menuApplications));
+            menuApplications.click();
+            return tab.cast(new ApplicationsPage(driver));
+        }
+
+        if (tab == FlinkHomePage.class) {
+            new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(menuFlinkHome));
+            menuFlinkHome.click();
+            return tab.cast(new FlinkHomePage(driver));
+        }
+
+        throw new UnsupportedOperationException("Unknown tab: " + 
tab.getName());
+    }
+
+    public interface Tab {
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/FlinkHomePage.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/FlinkHomePage.java
new file mode 100644
index 000000000..261a1424c
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/FlinkHomePage.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink;
+
+import lombok.Getter;
+import org.apache.streampark.e2e.pages.common.NavBarPage;
+import org.apache.streampark.e2e.pages.system.entity.UserManagementStatus;
+import org.apache.streampark.e2e.pages.system.entity.UserManagementUserType;
+import org.openqa.selenium.By;
+import org.openqa.selenium.Keys;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+import java.util.List;
+
+@Getter
+public class FlinkHomePage extends NavBarPage implements ApacheFlinkPage.Tab {
+    @FindBy(xpath = "//span[contains(., 'Flink 
Home')]/..//button[contains(@class, 'ant-btn')]/span[contains(text(), 'Add 
New')]")
+    private WebElement buttonCreateFlinkHome;
+
+    @FindBy(xpath = "//div[contains(@class, 'ant-spin-container')]")
+    private List<WebElement> flinkHomeList;
+
+    @FindBy(xpath = "//button[contains(@class, 'ant-btn')]/span[contains(., 
'Yes')]")
+    private WebElement deleteConfirmButton;
+
+    private final CreateFlinkHomeForm createFlinkHomeForm = new 
CreateFlinkHomeForm();
+
+    public FlinkHomePage(RemoteWebDriver driver) {
+        super(driver);
+    }
+
+    public FlinkHomePage createFlinkHome(String flinkName, String flinkHome, 
String description) {
+        waitForPageLoading();
+
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(buttonCreateFlinkHome));
+        buttonCreateFlinkHome.click();
+        createFlinkHomeForm.inputFlinkName().sendKeys(flinkName);
+        createFlinkHomeForm.inputFlinkHome().sendKeys(flinkHome);
+        createFlinkHomeForm.inputDescription().sendKeys(description);
+        createFlinkHomeForm.buttonSubmit().click();
+
+        waitForClickFinish();
+        return this;
+    }
+
+    public FlinkHomePage editFlinkHome(String oldFlinkName, String 
newFlinkName) {
+        waitForPageLoading();
+
+        flinkHomeList()
+            .stream()
+            .filter(it -> it.getText().contains(oldFlinkName))
+            .flatMap(it -> 
it.findElements(By.xpath("//button[contains(@class,'ant-btn')]/span[contains(@aria-label,'edit')]")).stream())
+            .filter(WebElement::isDisplayed)
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("No edit button in flink 
home list"))
+            .click();
+
+        createFlinkHomeForm.inputFlinkName().sendKeys(Keys.CONTROL+"a");
+        createFlinkHomeForm.inputFlinkName().sendKeys(Keys.BACK_SPACE);
+        createFlinkHomeForm.inputFlinkName().sendKeys(newFlinkName);
+        createFlinkHomeForm.buttonSubmit().click();
+
+        waitForClickFinish();
+        return this;
+    }
+
+    public FlinkHomePage deleteFlinkHome(String flinkName) {
+        waitForPageLoading();
+
+        flinkHomeList()
+                .stream()
+                .filter(it -> it.getText().contains(flinkName))
+                .flatMap(it -> 
it.findElements(By.xpath("//button[contains(@class,'ant-btn')]/span[contains(@aria-label,'delete')]")).stream())
+                .filter(WebElement::isDisplayed)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No delete button in 
flink home list"))
+                .click();
+
+        deleteConfirmButton.click();
+
+        waitForClickFinish();
+        return this;
+    }
+
+    private void waitForPageLoading() {
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.urlContains("/flink/home"));
+    }
+
+    private void waitForClickFinish() {
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(buttonCreateFlinkHome));
+    }
+
+    @Getter
+    public class CreateFlinkHomeForm {
+        CreateFlinkHomeForm() {
+            PageFactory.initElements(driver, this);
+        }
+
+        @FindBy(id = "form_item_flinkName")
+        private WebElement inputFlinkName;
+
+        @FindBy(id = "form_item_flinkHome")
+        private WebElement inputFlinkHome;
+
+        @FindBy(id = "form_item_description")
+        private WebElement inputDescription;
+
+        @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(text(), 'OK')]")
+        private WebElement buttonSubmit;
+
+        @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(text(), 'Cancel')]")
+        private WebElement buttonCancel;
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationForm.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationForm.java
new file mode 100644
index 000000000..217e0c307
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationForm.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import 
org.apache.streampark.e2e.pages.apacheflink.applications.entity.ApplicationsDynamicParams;
+import org.openqa.selenium.JavascriptExecutor;
+import org.openqa.selenium.WebDriver;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.FindBys;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+import java.util.List;
+
+
+@Getter
+public final class ApplicationForm {
+    private WebDriver driver;
+
+    @FindBy(xpath = "//div[contains(@codefield, 
'jobType')]//div[contains(@class, 'ant-select-selector')]")
+    private WebElement buttonDevelopmentModeDropdown;
+
+    @FindBys({
+        @FindBy(css = "[codefield=jobType]"),
+        @FindBy(className = "ant-select-item-option-content")
+    })
+    private List<WebElement> selectDevelopmentMode;
+
+    @FindBy(xpath = "//div[contains(@codefield, 
'executionMode')]//div[contains(@class, 'ant-select-selector')]")
+    private WebElement buttonExecutionModeDropdown;
+
+    @FindBys({
+        @FindBy(css = "[codefield=executionMode]"),
+        @FindBy(className = "ant-select-item-option-content")
+    })
+    private List<WebElement> selectExecutionMode;
+
+    @FindBy(id = "form_item_jobName")
+    private WebElement inputApplicationName;
+
+    @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(text(), 'Submit')]")
+    private WebElement buttonSubmit;
+
+    @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(text(), 'Cancel')]")
+    private WebElement buttonCancel;
+
+    public ApplicationForm(WebDriver driver) {
+        this.driver = driver;
+
+        PageFactory.initElements(driver, this);
+    }
+
+    @SneakyThrows
+    public ApplicationForm addApplication(DevelopmentMode developmentMode,
+                                          ExecutionMode executionMode,
+                                          String applicationName,
+                                          String flinkVersion,
+                                          ApplicationsDynamicParams 
applicationsDynamicParams) {
+        Thread.sleep(1000);
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(buttonDevelopmentModeDropdown));
+        buttonDevelopmentModeDropdown.click();
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.visibilityOfAllElements(selectDevelopmentMode));
+        switch (developmentMode) {
+            case CUSTOM_CODE:
+                selectDevelopmentMode.stream()
+                    .filter(e -> 
e.getText().equalsIgnoreCase(DevelopmentMode.CUSTOM_CODE.desc()))
+                    .findFirst()
+                    .orElseThrow(() -> new 
IllegalArgumentException(String.format("Development mode not found: %s", 
developmentMode.desc())))
+                    .click();
+                break;
+            case FLINK_SQL:
+                selectDevelopmentMode.stream()
+                    .filter(e -> 
e.getText().equalsIgnoreCase(DevelopmentMode.FLINK_SQL.desc()))
+                    .findFirst()
+                    .orElseThrow(() -> new 
IllegalArgumentException(String.format("Development mode not found: %s", 
developmentMode.desc())))
+                    .click();
+                buttonExecutionModeDropdown.click();
+                switch (executionMode) {
+                    case REMOTE:
+                        selectExecutionMode.stream()
+                            .filter(e -> 
e.getText().equalsIgnoreCase(ExecutionMode.REMOTE.desc()))
+                            .findFirst()
+                            .orElseThrow(() -> new 
IllegalArgumentException(String.format("Execution mode not found: %s", 
executionMode.desc()))
+                            )
+                            .click();
+                        break;
+                    case YARN_APPLICATION:
+                        selectExecutionMode.stream()
+                            .filter(e -> 
e.getText().equalsIgnoreCase(ExecutionMode.YARN_APPLICATION.desc()))
+                            .findFirst()
+                            .orElseThrow(() -> new 
IllegalArgumentException(String.format("Execution mode not found: %s", 
executionMode.desc()))
+                            )
+                            .click();
+                        new FlinkSQLYarnApplicationForm(driver)
+                            .add(flinkVersion, 
applicationsDynamicParams.flinkSQL());
+                        break;
+                    case YARN_SESSION:
+                        selectExecutionMode.stream()
+                            .filter(e -> 
e.getText().equalsIgnoreCase(ExecutionMode.YARN_SESSION.desc()))
+                            .findFirst()
+                            .orElseThrow(() -> new 
IllegalArgumentException(String.format("Execution mode not found: %s", 
executionMode.desc()))
+                            )
+                            .click();
+                        break;
+                    case KUBERNETES_SESSION:
+                        selectExecutionMode.stream()
+                            .filter(e -> 
e.getText().equalsIgnoreCase(ExecutionMode.KUBERNETES_SESSION.desc()))
+                            .findFirst()
+                            .orElseThrow(() -> new 
IllegalArgumentException(String.format("Execution mode not found: %s", 
executionMode.desc()))
+                            )
+                            .click();
+                        break;
+                    case KUBERNETES_APPLICATION:
+                        selectExecutionMode.stream()
+                            .filter(e -> 
e.getText().equalsIgnoreCase(ExecutionMode.KUBERNETES_APPLICATION.desc()))
+                            .findFirst()
+                            .orElseThrow(() -> new 
IllegalArgumentException(String.format("Execution mode not found: %s", 
executionMode.desc()))
+                            )
+                            .click();
+                        break;
+                    case YARN_PER_JOB:
+                        selectExecutionMode.stream()
+                            .filter(e -> 
e.getText().equalsIgnoreCase(ExecutionMode.YARN_PER_JOB.desc()))
+                            .findFirst()
+                            .orElseThrow(() -> new 
IllegalArgumentException(String.format("Execution mode not found: %s", 
executionMode.desc()))
+                            )
+                            .click();
+                        new FlinkSQLYarnApplicationForm(driver)
+                            .add(flinkVersion, 
applicationsDynamicParams.flinkSQL());
+                        break;
+                    default:
+                        throw new 
IllegalArgumentException(String.format("Unknown execution mode: %s", 
executionMode.desc()));
+                }
+                break;
+            case PYTHON_FLINK:
+                selectDevelopmentMode.stream()
+                    .filter(e -> 
e.getText().equalsIgnoreCase(DevelopmentMode.PYTHON_FLINK.toString()))
+                    .findFirst()
+                    .orElseThrow(() -> new 
IllegalArgumentException(String.format("Development mode not found: %s", 
developmentMode)))
+                    .click();
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unknown 
development mode: %s", developmentMode));
+        }
+        inputApplicationName.sendKeys(applicationName);
+
+        buttonSubmit.click();
+
+        return this;
+    }
+
+    @Getter
+    public enum DevelopmentMode {
+        CUSTOM_CODE("custom code"),
+        FLINK_SQL("flink sql"),
+        PYTHON_FLINK("python flink"),
+        ;
+
+        private final String desc;
+
+        DevelopmentMode(String desc) {
+            this.desc = desc;
+        }
+    }
+
+    @Getter
+    public enum ExecutionMode {
+        REMOTE("remote"),
+        YARN_APPLICATION("yarn application"),
+        YARN_SESSION("yarn session"),
+        KUBERNETES_SESSION("kubernetes session"),
+        KUBERNETES_APPLICATION("kubernetes application"),
+        YARN_PER_JOB("yarn per-job (deprecated, please use yarn-application 
mode)"),
+        ;
+
+        private final String desc;
+
+        ExecutionMode(String desc) {
+            this.desc = desc;
+        }
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationsPage.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationsPage.java
new file mode 100644
index 000000000..17a24be36
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationsPage.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications;
+
+import lombok.Getter;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.common.NavBarPage;
+import org.apache.streampark.e2e.pages.system.entity.UserManagementStatus;
+import org.apache.streampark.e2e.pages.system.entity.UserManagementUserType;
+import org.openqa.selenium.By;
+import org.openqa.selenium.Keys;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.interactions.Actions;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.FindBys;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+import java.util.List;
+
+@Getter
+public class ApplicationsPage extends NavBarPage implements 
ApacheFlinkPage.Tab {
+    @FindBy(xpath = "//div[contains(@class, 
'app_list')]//button[contains(@class, 'ant-btn-primary')]/span[contains(text(), 
'Add New')]")
+    private WebElement buttonCreateApplication;
+
+    @FindBy(xpath = "//tbody[contains(@class, 'ant-table-tbody')]")
+    private List<WebElement> applicationsList;
+
+    @FindBy(className = "ant-form-item-explain-error")
+    private List<WebElement> errorMessageList;
+
+    @FindBy(xpath = "//div[contains(@class, 
'ant-dropdown-content')]//span[contains(text(), 'Delete')]")
+    private WebElement deleteButton;
+
+    @FindBy(xpath = "//button[contains(@class, 'ant-btn')]/span[contains(., 
'OK')]")
+    private WebElement deleteConfirmButton;
+
+    private final StartJobForm startJobForm = new StartJobForm();
+
+    private final CancelJobForm cancelJobForm = new CancelJobForm();
+
+    public ApplicationsPage(RemoteWebDriver driver) {
+        super(driver);
+    }
+
+    public ApplicationForm createApplication() {
+        waitForPageLoading();
+
+        buttonCreateApplication.click();
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.urlContains("/flink/app/add"));
+        return new ApplicationForm(driver);
+    }
+
+    public ApplicationsPage deleteApplication(String applicationName) {
+        waitForPageLoading();
+
+        WebElement extraButton = applicationsList()
+            .stream()
+            .filter(it -> it.getText().contains(applicationName))
+            .flatMap(it -> 
it.findElements(By.xpath("//span[contains(@aria-label, 'more')]/..")).stream())
+            .filter(WebElement::isDisplayed)
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("No extra button in 
applications list"))
+            ;
+        Actions actions = new Actions(this.driver);
+        actions.moveToElement(extraButton).perform();
+        deleteButton.click();
+        deleteConfirmButton.click();
+
+        return this;
+    }
+
+    public ApplicationsPage startApplication(String applicationName) {
+        waitForPageLoading();
+
+        applicationsList()
+            .stream()
+            .filter(it -> it.getText().contains(applicationName))
+            .flatMap(it -> it.findElements(By.xpath("//button[contains(@auth, 
'app:start')]")).stream())
+            .filter(WebElement::isDisplayed)
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("No start button in 
applications list"))
+            .click();
+
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(startJobForm().radioFromSavepoint()));
+        startJobForm.radioFromSavepoint().click();
+        startJobForm.buttonSubmit().click();
+
+        return this;
+    }
+
+    public ApplicationsPage releaseApplication(String applicationName) {
+        waitForPageLoading();
+
+        applicationsList()
+            .stream()
+            .filter(it -> it.getText().contains(applicationName))
+            .flatMap(it -> it.findElements(By.xpath("//button[contains(@auth, 
'app:release')]")).stream())
+            .filter(WebElement::isDisplayed)
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("No release button in 
applications list"))
+            .click();
+
+        return this;
+    }
+
+    public ApplicationsPage cancelApplication(String applicationName) {
+        waitForPageLoading();
+
+        applicationsList()
+            .stream()
+            .filter(it -> it.getText().contains(applicationName))
+            .flatMap(it -> it.findElements(By.xpath("//button[contains(@auth, 
'app:cancel')]")).stream())
+            .filter(WebElement::isDisplayed)
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("No cancel button in 
applications list"))
+            .click();
+
+        cancelJobForm.radioFromSavepoint().click();
+        cancelJobForm.buttonSubmit().click();
+
+        return this;
+    }
+
+    private void waitForPageLoading() {
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.urlContains("/flink/app"));
+    }
+
+    @Getter
+    public class StartJobForm {
+        StartJobForm() {
+            PageFactory.initElements(driver, this);
+        }
+
+        @FindBy(xpath = 
"//button[@id='startApplicationModal_startSavePointed']//span[contains(text(), 
'ON')]")
+        private WebElement radioFromSavepoint;
+
+        @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(., 'Apply')]")
+        private WebElement buttonSubmit;
+
+        @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(., 'Cancel')]")
+        private WebElement buttonCancel;
+    }
+
+    @Getter
+    public class CancelJobForm {
+        CancelJobForm() {
+            PageFactory.initElements(driver, this);
+        }
+
+        @FindBy(xpath = "//span[contains(text(), 'ON')]")
+        private WebElement radioFromSavepoint;
+
+        @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(., 'Apply')]")
+        private WebElement buttonSubmit;
+
+        @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(., 'Cancel')]")
+        private WebElement buttonCancel;
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLEditor.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLEditor.java
new file mode 100644
index 000000000..e459f9cbc
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLEditor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications;
+
+import lombok.Getter;
+import org.openqa.selenium.WebDriver;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.interactions.Actions;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+
+@Getter
+public final class FlinkSQLEditor {
+    @FindBy(xpath = "//label[contains(@for, 
'form_item_flinkSql')]/../..//div[contains(@class, 
'monaco-editor')]//div[contains(@class, 'view-line')]")
+    private WebElement flinkSqlEditor;
+
+    private WebDriver driver;
+
+    public FlinkSQLEditor(WebDriver driver) {
+        PageFactory.initElements(driver, this);
+        this.driver = driver;
+    }
+
+    public FlinkSQLEditor content(String content) {
+        new WebDriverWait(this.driver, 
Duration.ofSeconds(20)).until(ExpectedConditions.elementToBeClickable(flinkSqlEditor));
+
+        flinkSqlEditor.click();
+
+        Actions actions = new Actions(this.driver);
+        actions.moveToElement(flinkSqlEditor).sendKeys(content).perform();
+
+        return this;
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLYarnApplicationForm.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLYarnApplicationForm.java
new file mode 100644
index 000000000..6645835e1
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLYarnApplicationForm.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.openqa.selenium.WebDriver;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.FindBys;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+import java.util.List;
+
+@Getter
+public final class FlinkSQLYarnApplicationForm {
+    private WebDriver driver;
+
+    @FindBy(xpath = "//div[contains(@codefield, 
'versionId')]//div[contains(@class, 'ant-select-selector')]")
+    private WebElement buttonFlinkVersionDropdown;
+
+    @FindBys({
+        @FindBy(css = "[codefield=versionId]"),
+        @FindBy(className = "ant-select-item-option-content")
+    })
+    private List<WebElement> selectFlinkVersion;
+
+    public FlinkSQLYarnApplicationForm(WebDriver driver) {
+        this.driver = driver;
+
+        PageFactory.initElements(driver, this);
+    }
+
+    @SneakyThrows
+    public FlinkSQLYarnApplicationForm add(String flinkVersion, String 
flinkSql) {
+        buttonFlinkVersionDropdown.click();
+        new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.visibilityOfAllElements(selectFlinkVersion));
+        selectFlinkVersion.stream()
+            .filter(e -> e.getText().equalsIgnoreCase(flinkVersion))
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("Flink version not 
found"))
+            .click();
+
+        new FlinkSQLEditor(driver).content(flinkSql);
+
+        return this;
+    }
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/entity/ApplicationsDynamicParams.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/entity/ApplicationsDynamicParams.java
new file mode 100644
index 000000000..61eb60e15
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/entity/ApplicationsDynamicParams.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications.entity;
+
+import lombok.Data;
+
+@Data
+public class ApplicationsDynamicParams {
+    private String flinkSQL;
+}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/NavBarPage.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/NavBarPage.java
index 3d44142d8..36f0a178a 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/NavBarPage.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/NavBarPage.java
@@ -20,8 +20,10 @@
 
 package org.apache.streampark.e2e.pages.common;
 
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
 import org.apache.streampark.e2e.pages.system.SystemPage;
 
+import org.openqa.selenium.By;
 import org.openqa.selenium.WebElement;
 import org.openqa.selenium.remote.RemoteWebDriver;
 import org.openqa.selenium.support.FindBy;
@@ -55,9 +57,21 @@ public class NavBarPage {
     }
 
     public <T extends NavBarItem> T goToNav(Class<T> nav) {
+        if (nav == ApacheFlinkPage.class) {
+            new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(apacheFlinkTab));
+            String tabOpenStateXpath = "//span[contains(@class, 'ml-2') and 
contains(@class, 'streampark-simple-menu-sub-title') and contains(text(), 
'Apache Flink')]/../../li[contains(@class, 'streampark-menu-opened')]";
+            if (driver.findElements(By.xpath(tabOpenStateXpath)).isEmpty()) {
+                apacheFlinkTab.click();
+            }
+            return nav.cast(new ApacheFlinkPage(driver));
+        }
+
         if (nav == SystemPage.class) {
-            new WebDriverWait(driver, 
Duration.ofSeconds(60)).until(ExpectedConditions.elementToBeClickable(systemTab));
-            systemTab.click();
+            new WebDriverWait(driver, 
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(systemTab));
+            String tabOpenStateXpath = "//span[contains(@class, 'ml-2') and 
contains(@class, 'streampark-simple-menu-sub-title') and contains(text(), 
'System')]/../../li[contains(@class, 'streampark-menu-opened')]";
+            if (driver.findElements(By.xpath(tabOpenStateXpath)).isEmpty()) {
+                systemTab.click();
+            }
             return nav.cast(new SystemPage(driver));
         }
 
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/Dockerfile
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/Dockerfile
new file mode 100644
index 000000000..97c5b8475
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/Dockerfile
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+    && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g" 
/streampark/conf/config.yaml
+
+ENV FLINK_VERSION 1.16.3
+
+# Install Flink
+RUN sudo wget --no-check-certificate -O 
/flink-${FLINK_VERSION}-bin-scala_2.12.tgz 
https://dlcdn.apache.org/flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
 \
+    && cd / \
+    && sudo tar -zxf flink-${FLINK_VERSION}-bin-scala_2.12.tgz \
+    && sudo chown -R hadoop.hadoop /flink-${FLINK_VERSION}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
new file mode 100644
index 000000000..b6dfba10f
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.yaml
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..6ea5912e7
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.yaml
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+
+services:
+  namenode:
+    image: apache/streampark-flink-1.16.3-on-yarn:ci
+    hostname: namenode
+    command: [ "hdfs", "namenode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "19870:9870"
+    env_file:
+      - docker-compose.config
+    environment:
+      ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://namenode:9870"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  datanode:
+    image: apache/streampark-flink-1.16.3-on-yarn:ci
+    hostname: datanode
+    command: [ "hdfs", "datanode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://datanode:9864"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+    depends_on:
+      namenode:
+        condition: service_healthy
+  resourcemanager:
+    image: apache/streampark-flink-1.16.3-on-yarn:ci
+    hostname: resourcemanager
+    command: [ "yarn", "resourcemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "18088:8088"
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://resourcemanager:8088"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  nodemanager:
+    image: apache/streampark-flink-1.16.3-on-yarn:ci
+    hostname: nodemanager
+    command: [ "yarn", "nodemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    depends_on:
+      resourcemanager:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "curl", "http://nodemanager:8042"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  streampark:
+    image: apache/streampark-flink-1.16.3-on-yarn:ci
+    hostname: streampark
+    command:
+      - sh
+      - -c
+      - /streampark/bin/streampark.sh start_docker
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "20000:10000"
+    environment:
+      - SPRING_PROFILES_ACTIVE=h2
+      - TZ=Asia/Shanghai
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://streampark:10000"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+networks:
+  e2e:
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/Dockerfile
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/Dockerfile
new file mode 100644
index 000000000..f22348d0e
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/Dockerfile
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+    && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g" 
/streampark/conf/config.yaml
+
+ENV FLINK_VERSION 1.17.2
+
+# Install Flink
+RUN sudo wget --no-check-certificate -O 
/flink-${FLINK_VERSION}-bin-scala_2.12.tgz 
https://dlcdn.apache.org/flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
 \
+    && cd / \
+    && sudo tar -zxf flink-${FLINK_VERSION}-bin-scala_2.12.tgz \
+    && sudo chown -R hadoop.hadoop /flink-${FLINK_VERSION}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
new file mode 100644
index 000000000..b6dfba10f
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.yaml
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..74d0b85e8
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.yaml
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+
+services:
+  namenode:
+    image: apache/streampark-flink-1.17.2-on-yarn:ci
+    hostname: namenode
+    command: [ "hdfs", "namenode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "19870:9870"
+    env_file:
+      - docker-compose.config
+    environment:
+      ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://namenode:9870"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  datanode:
+    image: apache/streampark-flink-1.17.2-on-yarn:ci
+    hostname: datanode
+    command: [ "hdfs", "datanode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://datanode:9864"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+    depends_on:
+      namenode:
+        condition: service_healthy
+  resourcemanager:
+    image: apache/streampark-flink-1.17.2-on-yarn:ci
+    hostname: resourcemanager
+    command: [ "yarn", "resourcemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "18088:8088"
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://resourcemanager:8088"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  nodemanager:
+    image: apache/streampark-flink-1.17.2-on-yarn:ci
+    hostname: nodemanager
+    command: [ "yarn", "nodemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    depends_on:
+      resourcemanager:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "curl", "http://nodemanager:8042"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  streampark:
+    image: apache/streampark-flink-1.17.2-on-yarn:ci
+    hostname: streampark
+    command:
+      - sh
+      - -c
+      - /streampark/bin/streampark.sh start_docker
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "20000:10000"
+    environment:
+      - SPRING_PROFILES_ACTIVE=h2
+      - TZ=Asia/Shanghai
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://streampark:10000"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+networks:
+  e2e:
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/Dockerfile
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/Dockerfile
new file mode 100644
index 000000000..12e5cdaad
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/Dockerfile
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+    && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g" 
/streampark/conf/config.yaml
+
+ENV FLINK_VERSION 1.18.1
+
+# Install Flink
+RUN sudo wget --no-check-certificate -O 
/flink-${FLINK_VERSION}-bin-scala_2.12.tgz 
https://dlcdn.apache.org/flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
 \
+    && cd / \
+    && sudo tar -zxf flink-${FLINK_VERSION}-bin-scala_2.12.tgz \
+    && sudo chown -R hadoop.hadoop /flink-${FLINK_VERSION}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
new file mode 100644
index 000000000..b6dfba10f
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.yaml
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..ad92f84c0
--- /dev/null
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.yaml
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+
+services:
+  namenode:
+    image: apache/streampark-flink-1.18.1-on-yarn:ci
+    hostname: namenode
+    command: [ "hdfs", "namenode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "19870:9870"
+    env_file:
+      - docker-compose.config
+    environment:
+      ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://namenode:9870"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  datanode:
+    image: apache/streampark-flink-1.18.1-on-yarn:ci
+    hostname: datanode
+    command: [ "hdfs", "datanode" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://datanode:9864"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+    depends_on:
+      namenode:
+        condition: service_healthy
+  resourcemanager:
+    image: apache/streampark-flink-1.18.1-on-yarn:ci
+    hostname: resourcemanager
+    command: [ "yarn", "resourcemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "18088:8088"
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://resourcemanager:8088"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  nodemanager:
+    image: apache/streampark-flink-1.18.1-on-yarn:ci
+    hostname: nodemanager
+    command: [ "yarn", "nodemanager" ]
+    networks:
+      - e2e
+    build:
+      context: ./
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    depends_on:
+      resourcemanager:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "curl", "http://nodemanager:8042"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+  streampark:
+    image: apache/streampark-flink-1.18.1-on-yarn:ci
+    hostname: streampark
+    command:
+      - sh
+      - -c
+      - /streampark/bin/streampark.sh start_docker
+    networks:
+      - e2e
+    build:
+      context: ./
+    ports:
+      - "20000:10000"
+    environment:
+      - SPRING_PROFILES_ACTIVE=h2
+      - TZ=Asia/Shanghai
+    env_file:
+      - docker-compose.config
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200m"
+        max-file: "1"
+    tty: true
+    stdin_open: true
+    restart: always
+    healthcheck:
+      test: [ "CMD", "curl", "http://streampark:10000"; ]
+      interval: 5s
+      timeout: 5s
+      retries: 120
+
+networks:
+  e2e:
diff --git 
a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/StreamParkExtension.java
 
b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/StreamParkExtension.java
index 9592381a4..b33a59e80 100644
--- 
a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/StreamParkExtension.java
+++ 
b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/StreamParkExtension.java
@@ -77,7 +77,7 @@ final class StreamParkExtension implements BeforeAllCallback, 
AfterAllCallback,
     @Override
     @SuppressWarnings("UnstableApiUsage")
     public void beforeAll(ExtensionContext context) throws IOException {
-        Awaitility.setDefaultTimeout(Duration.ofSeconds(60));
+        Awaitility.setDefaultTimeout(Duration.ofSeconds(120));
         Awaitility.setDefaultPollInterval(Duration.ofSeconds(2));
 
         setRecordPath();

Reply via email to