This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 942116bd2 add feature 3861 (#3890)
942116bd2 is described below

commit 942116bd25e2e226bf3b71edb4c1ce8147ec1ba3
Author: xiangzihao <[email protected]>
AuthorDate: Fri Jul 19 12:29:46 2024 +0800

    add feature 3861 (#3890)
---
 .../src/views/flink/app/hooks/useFlinkRender.tsx   |   1 +
 .../e2e/cases/FlinkSQL116OnYarnTest.java           | 123 ++++++++++++++---
 .../e2e/cases/FlinkSQL117OnYarnTest.java           | 150 ++++++++++++++++++---
 .../e2e/cases/FlinkSQL118OnYarnTest.java           | 150 ++++++++++++++++++---
 .../streampark/e2e/pages/common/Constants.java     |  32 +++++
 .../pages/flink/applications/ApplicationForm.java  |  68 ++++++++--
 .../pages/flink/applications/FlinkSQLEditor.java   |  75 ++++++++++-
 .../applications/FlinkSQLYarnApplicationForm.java  |  69 ----------
 .../entity/ApplicationsDynamicParams.java          |  26 ----
 .../pages/flink/clusters/FlinkClustersPage.java    |   4 +-
 .../flink-1.16-on-yarn/docker-compose.config       |   1 +
 .../flink-1.17-on-yarn/docker-compose.config       |   1 +
 .../flink-1.18-on-yarn/docker-compose.config       |   1 +
 .../org/apache/streampark/e2e/core/Constants.java  |  27 ----
 14 files changed, 541 insertions(+), 187 deletions(-)

diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
index 62975b584..79105228b 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
@@ -274,6 +274,7 @@ export const renderFlinkCluster = (clusters, { model, field 
}: RenderCallbackPar
       placeholder={t('flink.app.flinkCluster')}
       value={model[field]}
       onChange={(value: any) => (model[field] = value)}
+      codeField={field}
     >
       {clusters.map((item) => {
         return (
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
index 92d4c5dae..9fd649ff0 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
@@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
 import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
 import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
 import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
-import 
org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
 
 import lombok.SneakyThrows;
 import org.junit.jupiter.api.BeforeAll;
@@ -34,7 +36,7 @@ import org.openqa.selenium.WebElement;
 import org.openqa.selenium.remote.RemoteWebDriver;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
-import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
+import static org.apache.streampark.e2e.pages.common.Constants.TEST_FLINK_SQL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 @StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml")
@@ -54,6 +56,10 @@ public class FlinkSQL116OnYarnTest {
 
     private static final String applicationName = "flink-116-e2e-test";
 
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.16.3_cluster_e2e";
+
     @BeforeAll
     public static void setup() {
         FlinkHomePage flinkHomePage = new LoginPage(browser)
@@ -61,9 +67,22 @@ public class FlinkSQL116OnYarnTest {
             .goToNav(ApacheFlinkPage.class)
             .goToTab(FlinkHomePage.class);
 
-        flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        FlinkClustersPage flinkClustersPage = 
flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
+
+        flinkClustersPage.createFlinkCluster()
+            
.<YarnSessionForm>addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION)
+            .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        flinkClustersPage.startFlinkCluster(flinkClusterName);
 
-        
flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class);
+        flinkClustersPage.goToNav(ApacheFlinkPage.class)
+            .goToTab(ApplicationsPage.class);
     }
 
     @Test
@@ -71,17 +90,15 @@ public class FlinkSQL116OnYarnTest {
     void testCreateFlinkApplicationOnYarnApplicationMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
-        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
-
-        applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
         applicationsPage
             .createApplication()
             .addApplication(
                 ApplicationForm.DevelopmentMode.FLINK_SQL,
                 ApplicationForm.ExecutionMode.YARN_APPLICATION,
-                applicationName,
-                flinkName,
-                applicationsDynamicParams);
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(TEST_FLINK_SQL)
+            .submit();
 
         Awaitility.await()
             .untilAsserted(
@@ -176,16 +193,15 @@ public class FlinkSQL116OnYarnTest {
     void testCreateFlinkApplicationOnYarnPerJobMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
-        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
-        applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
         applicationsPage
             .createApplication()
             .addApplication(
                 ApplicationForm.DevelopmentMode.FLINK_SQL,
                 ApplicationForm.ExecutionMode.YARN_PER_JOB,
-                applicationName,
-                flinkName,
-                applicationsDynamicParams);
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(TEST_FLINK_SQL)
+            .submit();
 
         Awaitility.await()
             .untilAsserted(
@@ -248,4 +264,81 @@ public class FlinkSQL116OnYarnTest {
                         .noneMatch(it -> 
it.getText().contains(applicationName));
                 });
     }
+
+    @Test
+    @Order(90)
+    void testCreateFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.createApplication()
+            .addApplication(
+                ApplicationForm.DevelopmentMode.FLINK_SQL,
+                ApplicationForm.ExecutionMode.YARN_SESSION,
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(TEST_FLINK_SQL)
+            .flinkCluster(flinkClusterName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(100)
+    void testReleaseFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain released 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(110)
+    void testStartFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain started application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain finished 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(120)
+    void testDeleteFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+
+                    assertThat(applicationsPage.applicationsList())
+                        .noneMatch(it -> 
it.getText().contains(applicationName));
+                });
+    }
 }
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
index b98fae390..f636a94c8 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
@@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
 import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
 import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
 import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
-import 
org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
 
 import lombok.SneakyThrows;
 import org.junit.jupiter.api.BeforeAll;
@@ -34,7 +36,6 @@ import org.openqa.selenium.WebElement;
 import org.openqa.selenium.remote.RemoteWebDriver;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
-import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 @StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml")
@@ -54,6 +55,10 @@ public class FlinkSQL117OnYarnTest {
 
     private static final String applicationName = "flink-117-e2e-test";
 
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.17.2_cluster_e2e";
+
     @BeforeAll
     public static void setup() {
         FlinkHomePage flinkHomePage = new LoginPage(browser)
@@ -61,9 +66,22 @@ public class FlinkSQL117OnYarnTest {
             .goToNav(ApacheFlinkPage.class)
             .goToTab(FlinkHomePage.class);
 
-        flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        FlinkClustersPage flinkClustersPage = 
flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
 
-        
flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class);
+        flinkClustersPage.createFlinkCluster()
+            
.<YarnSessionForm>addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION)
+            .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        flinkClustersPage.startFlinkCluster(flinkClusterName);
+
+        flinkClustersPage.goToNav(ApacheFlinkPage.class)
+            .goToTab(ApplicationsPage.class);
     }
 
     @Test
@@ -71,17 +89,15 @@ public class FlinkSQL117OnYarnTest {
     void testCreateFlinkApplicationOnYarnApplicationMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
-        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
-
-        applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
         applicationsPage
             .createApplication()
             .addApplication(
                 ApplicationForm.DevelopmentMode.FLINK_SQL,
                 ApplicationForm.ExecutionMode.YARN_APPLICATION,
-                applicationName,
-                flinkName,
-                applicationsDynamicParams);
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .submit();
 
         Awaitility.await()
             .untilAsserted(
@@ -176,17 +192,15 @@ public class FlinkSQL117OnYarnTest {
     void testCreateFlinkApplicationOnYarnPerJobMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
-        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
-
-        applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
         applicationsPage
             .createApplication()
             .addApplication(
                 ApplicationForm.DevelopmentMode.FLINK_SQL,
                 ApplicationForm.ExecutionMode.YARN_PER_JOB,
-                applicationName,
-                flinkName,
-                applicationsDynamicParams);
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .submit();
 
         Awaitility.await()
             .untilAsserted(
@@ -249,4 +263,108 @@ public class FlinkSQL117OnYarnTest {
                         .noneMatch(it -> 
it.getText().contains(applicationName));
                 });
     }
+
+    @Test
+    @Order(90)
+    void testCreateFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage
+            .createApplication()
+            .addApplication(
+                ApplicationForm.DevelopmentMode.FLINK_SQL,
+                ApplicationForm.ExecutionMode.YARN_SESSION,
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .flinkCluster(flinkClusterName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(100)
+    void testReleaseFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain released 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(110)
+    void testStartFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain started application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain finished 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(120)
+    @SneakyThrows
+    void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
+        Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain restarted 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        applicationsPage.cancelApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(130)
+    void testDeleteFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+
+                    assertThat(applicationsPage.applicationsList())
+                        .noneMatch(it -> 
it.getText().contains(applicationName));
+                });
+    }
 }
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
index a91da141a..89367e8c5 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
@@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
 import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
 import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
 import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
-import 
org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
 
 import lombok.SneakyThrows;
 import org.junit.jupiter.api.BeforeAll;
@@ -34,7 +36,6 @@ import org.openqa.selenium.WebElement;
 import org.openqa.selenium.remote.RemoteWebDriver;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
-import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 @StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
@@ -54,6 +55,10 @@ public class FlinkSQL118OnYarnTest {
 
     private static final String applicationName = "flink-118-e2e-test";
 
+    private static final String flinkDescription = "description test";
+
+    private static final String flinkClusterName = "flink_1.18.1_cluster_e2e";
+
     @BeforeAll
     public static void setup() {
         FlinkHomePage flinkHomePage = new LoginPage(browser)
@@ -61,9 +66,22 @@ public class FlinkSQL118OnYarnTest {
             .goToNav(ApacheFlinkPage.class)
             .goToTab(FlinkHomePage.class);
 
-        flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+        flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+        FlinkClustersPage flinkClustersPage = 
flinkHomePage.goToNav(ApacheFlinkPage.class)
+            .goToTab(FlinkClustersPage.class);
 
-        
flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class);
+        flinkClustersPage.createFlinkCluster()
+            
.<YarnSessionForm>addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION)
+            .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+            .clusterName(flinkClusterName)
+            .flinkVersion(flinkName)
+            .submit();
+
+        flinkClustersPage.startFlinkCluster(flinkClusterName);
+
+        flinkClustersPage.goToNav(ApacheFlinkPage.class)
+            .goToTab(ApplicationsPage.class);
     }
 
     @Test
@@ -71,17 +89,15 @@ public class FlinkSQL118OnYarnTest {
     void testCreateFlinkApplicationOnYarnApplicationMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
-        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
-
-        applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
         applicationsPage
             .createApplication()
             .addApplication(
                 ApplicationForm.DevelopmentMode.FLINK_SQL,
                 ApplicationForm.ExecutionMode.YARN_APPLICATION,
-                applicationName,
-                flinkName,
-                applicationsDynamicParams);
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .submit();
 
         Awaitility.await()
             .untilAsserted(
@@ -176,17 +192,15 @@ public class FlinkSQL118OnYarnTest {
     void testCreateFlinkApplicationOnYarnPerJobMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
-        ApplicationsDynamicParams applicationsDynamicParams = new 
ApplicationsDynamicParams();
-
-        applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
         applicationsPage
             .createApplication()
             .addApplication(
                 ApplicationForm.DevelopmentMode.FLINK_SQL,
                 ApplicationForm.ExecutionMode.YARN_PER_JOB,
-                applicationName,
-                flinkName,
-                applicationsDynamicParams);
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .submit();
 
         Awaitility.await()
             .untilAsserted(
@@ -275,4 +289,108 @@ public class FlinkSQL118OnYarnTest {
                         .noneMatch(it -> 
it.getText().contains(applicationName));
                 });
     }
+
+    @Test
+    @Order(90)
+    void testCreateFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage
+            .createApplication()
+            .addApplication(
+                ApplicationForm.DevelopmentMode.FLINK_SQL,
+                ApplicationForm.ExecutionMode.YARN_SESSION,
+                applicationName)
+            .flinkVersion(flinkName)
+            .flinkSql(Constants.TEST_FLINK_SQL)
+            .flinkCluster(flinkClusterName)
+            .submit();
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain newly-created 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains(applicationName)));
+    }
+
+    @Test
+    @Order(100)
+    void testReleaseFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.releaseApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain released 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("SUCCESS")));
+    }
+
+    @Test
+    @Order(110)
+    void testStartFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain started application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain finished 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("FINISHED")));
+    }
+
+    @Test
+    @Order(120)
+    @SneakyThrows
+    void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
+        Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.startApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain restarted 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("RUNNING")));
+
+        applicationsPage.cancelApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> assertThat(applicationsPage.applicationsList())
+                    .as("Applications list should contain canceled 
application")
+                    .extracting(WebElement::getText)
+                    .anyMatch(it -> it.contains("CANCELED")));
+    }
+
+    @Test
+    @Order(130)
+    void testDeleteFlinkApplicationOnYarnSessionMode() {
+        final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
+
+        applicationsPage.deleteApplication(applicationName);
+
+        Awaitility.await()
+            .untilAsserted(
+                () -> {
+                    browser.navigate().refresh();
+
+                    assertThat(applicationsPage.applicationsList())
+                        .noneMatch(it -> 
it.getText().contains(applicationName));
+                });
+    }
 }
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java
index 7319fc283..956556cee 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java
@@ -26,7 +26,39 @@ public class Constants {
 
     public static final Integer DEFAULT_SLEEP_MILLISECONDS = 2000;
 
+    public static final Integer DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS = 
1000;
+
     public static final Integer DEFAULT_PROJECT_BUILD_TIMEOUT_MINUTES = 5;
 
     public static final Duration DEFAULT_WEBDRIVER_WAIT_DURATION = 
Duration.ofSeconds(10);
+
+    public static final String LINE_SEPARATOR = "\n";
+
+    /** datagen flink sql for test */
+    public static final String TEST_FLINK_SQL = "CREATE TABLE datagen (\n"
+        + "f_sequence INT,\n"
+        + "f_random INT,\n"
+        + "f_random_str STRING,\n"
+        + "ts AS localtimestamp,\n"
+        + "WATERMARK FOR ts AS ts\n"
+        + ") WITH (\n"
+        + "'connector' = 'datagen',\n"
+        + "'rows-per-second'='5',\n"
+        + "'fields.f_sequence.kind'='sequence',\n"
+        + "'fields.f_sequence.start'='1',\n"
+        + "'fields.f_sequence.end'='100',\n"
+        + "'fields.f_random.min'='1',\n"
+        + "'fields.f_random.max'='100',\n"
+        + "'fields.f_random_str.length'='10'\n"
+        + ");\n"
+        + "\n"
+        + "CREATE TABLE print_table (\n"
+        + "f_sequence INT,\n"
+        + "f_random INT,\n"
+        + "f_random_str STRING\n"
+        + ") WITH (\n"
+        + "'connector' = 'print'\n"
+        + ");\n"
+        + "\n"
+        + "INSERT INTO print_table select f_sequence,f_random,f_random_str 
from datagen;";
 }
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
index 1f977301f..eb93e3cb7 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
@@ -18,12 +18,12 @@
 package org.apache.streampark.e2e.pages.flink.applications;
 
 import org.apache.streampark.e2e.pages.common.Constants;
-import 
org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
 
 import lombok.Getter;
 import lombok.SneakyThrows;
 import org.openqa.selenium.WebDriver;
 import org.openqa.selenium.WebElement;
+import org.openqa.selenium.interactions.Actions;
 import org.openqa.selenium.support.FindBy;
 import org.openqa.selenium.support.FindBys;
 import org.openqa.selenium.support.PageFactory;
@@ -58,6 +58,21 @@ public final class ApplicationForm {
     @FindBy(id = "form_item_jobName")
     private WebElement inputApplicationName;
 
+    @FindBy(xpath = "//div[contains(@codefield, 
'yarnSessionClusterId')]//div[contains(@class, 'ant-select-selector')]")
+    private WebElement buttonFlinkClusterDropdown;
+
+    @FindBy(className = "ant-select-item-option-content")
+    private List<WebElement> selectFlinkCluster;
+
+    @FindBy(xpath = "//div[contains(@codefield, 
'versionId')]//div[contains(@class, 'ant-select-selector')]")
+    private WebElement buttonFlinkVersionDropdown;
+
+    @FindBys({
+            @FindBy(css = "[codefield=versionId]"),
+            @FindBy(className = "ant-select-item-option-content")
+    })
+    private List<WebElement> selectFlinkVersion;
+
     @FindBy(xpath = "//button[contains(@class, 
'ant-btn')]//span[contains(text(), 'Submit')]")
     private WebElement buttonSubmit;
 
@@ -71,12 +86,9 @@ public final class ApplicationForm {
     }
 
     @SneakyThrows
-    public ApplicationForm addApplication(
-                                          DevelopmentMode developmentMode,
+    public ApplicationForm addApplication(DevelopmentMode developmentMode,
                                           ExecutionMode executionMode,
-                                          String applicationName,
-                                          String flinkVersion,
-                                          ApplicationsDynamicParams 
applicationsDynamicParams) {
+                                          String applicationName) {
         Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
         new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
             
.until(ExpectedConditions.elementToBeClickable(buttonDevelopmentModeDropdown));
@@ -125,8 +137,7 @@ public final class ApplicationForm {
                                     String.format("Execution mode not found: 
%s",
                                         executionMode.desc())))
                             .click();
-                        new FlinkSQLYarnApplicationForm(driver)
-                            .add(flinkVersion, 
applicationsDynamicParams.flinkSQL());
+
                         break;
                     case YARN_SESSION:
                         selectExecutionMode.stream()
@@ -170,8 +181,6 @@ public final class ApplicationForm {
                                     String.format("Execution mode not found: 
%s",
                                         executionMode.desc())))
                             .click();
-                        new FlinkSQLYarnApplicationForm(driver)
-                            .add(flinkVersion, 
applicationsDynamicParams.flinkSQL());
                         break;
                     default:
                         throw new IllegalArgumentException(
@@ -194,6 +203,45 @@ public final class ApplicationForm {
         }
         inputApplicationName.sendKeys(applicationName);
 
+        return this;
+    }
+
+    public ApplicationForm flinkVersion(String flinkVersion) {
+        new 
Actions(driver).moveToElement(buttonFlinkVersionDropdown).build().perform();
+        buttonFlinkVersionDropdown.click();
+        new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
+            
.until(ExpectedConditions.visibilityOfAllElements(selectFlinkVersion));
+        selectFlinkVersion.stream()
+            .filter(e -> e.getText().equalsIgnoreCase(flinkVersion))
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("Flink version not 
found"))
+            .click();
+
+        return this;
+    }
+
+    public ApplicationForm flinkSql(String flinkSql) {
+        new FlinkSQLEditor(driver).content(flinkSql);
+        return this;
+    }
+
+    @SneakyThrows
+    public ApplicationForm flinkCluster(String flinkClusterName) {
+        new 
Actions(driver).moveToElement(buttonFlinkClusterDropdown).build().perform();
+        buttonFlinkClusterDropdown.click();
+        Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+        selectFlinkCluster.stream()
+            .filter(e -> e.getText().contains(flinkClusterName))
+            .findFirst()
+            .orElseThrow(
+                () -> new IllegalArgumentException(
+                    String.format("Flink cluster not found: %s", 
flinkClusterName)))
+            .click();
+
+        return this;
+    }
+
+    public ApplicationForm submit() {
         buttonSubmit.click();
 
         return this;
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java
index c40901187..75d5fdd10 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java
@@ -20,6 +20,10 @@ package org.apache.streampark.e2e.pages.flink.applications;
 import org.apache.streampark.e2e.pages.common.Constants;
 
 import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.platform.commons.util.StringUtils;
+import org.openqa.selenium.Keys;
 import org.openqa.selenium.WebDriver;
 import org.openqa.selenium.WebElement;
 import org.openqa.selenium.interactions.Actions;
@@ -28,11 +32,14 @@ import org.openqa.selenium.support.PageFactory;
 import org.openqa.selenium.support.ui.ExpectedConditions;
 import org.openqa.selenium.support.ui.WebDriverWait;
 
+import java.util.List;
+
 @Getter
+@Slf4j
 public final class FlinkSQLEditor {
 
-    @FindBy(xpath = "//label[contains(@for, 
'form_item_flinkSql')]/../..//div[contains(@class, 
'monaco-editor')]//div[contains(@class, 'view-line')]")
-    private WebElement flinkSqlEditor;
+    @FindBy(xpath = "//label[contains(@for, 
'form_item_flinkSql')]/../..//div[contains(@class, 
'monaco-editor')]//div[contains(@class, 'view-line') and not(contains(@class, 
'view-lines'))]")
+    private List<WebElement> flinkSqlEditor;
 
     private WebDriver driver;
 
@@ -41,15 +48,69 @@ public final class FlinkSQLEditor {
         this.driver = driver;
     }
 
+    @SneakyThrows
     public FlinkSQLEditor content(String content) {
-        new WebDriverWait(this.driver, 
Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
-            .until(ExpectedConditions.elementToBeClickable(flinkSqlEditor));
-
-        flinkSqlEditor.click();
+        new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
+            
.until(ExpectedConditions.elementToBeClickable(flinkSqlEditor.get(0)));
 
         Actions actions = new Actions(this.driver);
-        actions.moveToElement(flinkSqlEditor).sendKeys(content).perform();
+
+        List<String> contentList = 
List.of(content.split(Constants.LINE_SEPARATOR));
+
+        for (int i = 0; i < contentList.size(); i++) {
+            String editorLineText;
+            String inputContent = contentList.get(i);
+            int flinkSqlEditorIndex = Math.min(i, 21);
+
+            if (i == 0) {
+                actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex))
+                    .click()
+                    .sendKeys(inputContent)
+                    .sendKeys(Constants.LINE_SEPARATOR)
+                    .perform();
+                continue;
+            } else {
+                editorLineText = 
flinkSqlEditor.get(flinkSqlEditorIndex).getText();
+            }
+
+            if (StringUtils.isNotBlank(inputContent)) {
+                if (editorLineText.isEmpty()) {
+                    
actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex))
+                        .click()
+                        .sendKeys(inputContent)
+                        .sendKeys(Constants.LINE_SEPARATOR)
+                        .perform();
+                    
Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS);
+                } else {
+                    for (int p = 0; p < editorLineText.strip().length(); p++) {
+                        clearLine(actions, 
flinkSqlEditor.get(flinkSqlEditorIndex));
+                    }
+                    if (!editorLineText.isEmpty()) {
+                        clearLine(actions, 
flinkSqlEditor.get(flinkSqlEditorIndex));
+                    }
+                    
actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex))
+                        .click()
+                        .sendKeys(inputContent)
+                        .sendKeys(Constants.LINE_SEPARATOR)
+                        .perform();
+                    
Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS);
+                }
+            } else {
+                actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex))
+                    .click()
+                    .sendKeys(Constants.LINE_SEPARATOR)
+                    .perform();
+                
Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS);
+            }
+        }
 
         return this;
     }
+
+    private void clearLine(Actions actions, WebElement element) {
+        actions.moveToElement(element)
+            .click()
+            .sendKeys(Keys.BACK_SPACE)
+            .perform();
+    }
 }
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java
deleted file mode 100644
index d418b9688..000000000
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.e2e.pages.flink.applications;
-
-import org.apache.streampark.e2e.pages.common.Constants;
-
-import lombok.Getter;
-import lombok.SneakyThrows;
-import org.openqa.selenium.WebDriver;
-import org.openqa.selenium.WebElement;
-import org.openqa.selenium.support.FindBy;
-import org.openqa.selenium.support.FindBys;
-import org.openqa.selenium.support.PageFactory;
-import org.openqa.selenium.support.ui.ExpectedConditions;
-import org.openqa.selenium.support.ui.WebDriverWait;
-
-import java.util.List;
-
-@Getter
-public final class FlinkSQLYarnApplicationForm {
-
-    private WebDriver driver;
-
-    @FindBy(xpath = "//div[contains(@codefield, 
'versionId')]//div[contains(@class, 'ant-select-selector')]")
-    private WebElement buttonFlinkVersionDropdown;
-
-    @FindBys({
-            @FindBy(css = "[codefield=versionId]"),
-            @FindBy(className = "ant-select-item-option-content")
-    })
-    private List<WebElement> selectFlinkVersion;
-
-    public FlinkSQLYarnApplicationForm(WebDriver driver) {
-        this.driver = driver;
-
-        PageFactory.initElements(driver, this);
-    }
-
-    @SneakyThrows
-    public FlinkSQLYarnApplicationForm add(String flinkVersion, String 
flinkSql) {
-        buttonFlinkVersionDropdown.click();
-        new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
-            
.until(ExpectedConditions.visibilityOfAllElements(selectFlinkVersion));
-        selectFlinkVersion.stream()
-            .filter(e -> e.getText().equalsIgnoreCase(flinkVersion))
-            .findFirst()
-            .orElseThrow(() -> new IllegalArgumentException("Flink version not 
found"))
-            .click();
-
-        new FlinkSQLEditor(driver).content(flinkSql);
-
-        return this;
-    }
-}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java
deleted file mode 100644
index 5dffe13a3..000000000
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.e2e.pages.flink.applications.entity;
-
-import lombok.Data;
-
-@Data
-public class ApplicationsDynamicParams {
-
-    private String flinkSQL;
-}
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java
index 977f00a63..c5793ecb5 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java
@@ -22,6 +22,7 @@ import org.apache.streampark.e2e.pages.common.NavBarPage;
 import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
 
 import lombok.Getter;
+import lombok.SneakyThrows;
 import org.openqa.selenium.By;
 import org.openqa.selenium.WebElement;
 import org.openqa.selenium.remote.RemoteWebDriver;
@@ -83,9 +84,10 @@ public class FlinkClustersPage extends NavBarPage implements 
ApacheFlinkPage.Tab
         return new ClusterDetailForm(driver);
     }
 
+    @SneakyThrows
     public FlinkClustersPage startFlinkCluster(String flinkClusterName) {
         waitForPageLoading();
-
+        Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
         flinkClusterList().stream()
             .filter(it -> it.getText().contains(flinkClusterName))
             .flatMap(
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
index 04a1ae239..441fa1118 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
@@ -35,6 +35,7 @@ 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
index 04a1ae239..441fa1118 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
@@ -35,6 +35,7 @@ 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
index 04a1ae239..441fa1118 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
@@ -35,6 +35,7 @@ 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
 
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
diff --git 
a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java
 
b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java
index e18f3828d..3d57754c2 100644
--- 
a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java
+++ 
b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java
@@ -34,31 +34,4 @@ public final class Constants {
     /** chrome download path in selenium/standalone-chrome-debug container */
     public static final String SELENIUM_CONTAINER_CHROME_DOWNLOAD_PATH = 
"/home/seluser/Downloads";
 
-    /** datagen flink sql for test */
-    public static final String TEST_FLINK_SQL = "CREATE TABLE datagen (\n"
-        + "f_sequence INT,\n"
-        + "f_random INT,\n"
-        + "f_random_str STRING,\n"
-        + "ts AS localtimestamp,\n"
-        + "WATERMARK FOR ts AS ts\n"
-        + ") WITH (\n"
-        + "'connector' = 'datagen',\n"
-        + "'rows-per-second'='5',\n"
-        + "'fields.f_sequence.kind'='sequence',\n"
-        + "'fields.f_sequence.start'='1',\n"
-        + "'fields.f_sequence.end'='100',\n"
-        + "'fields.f_random.min'='1',\n"
-        + "'fields.f_random.max'='100',\n"
-        + "'fields.f_random_str.length'='10'\n"
-        + ");\n"
-        + "\n"
-        + "CREATE TABLE print_table (\n"
-        + "f_sequence INT,\n"
-        + "f_random INT,\n"
-        + "f_random_str STRING\n"
-        + ") WITH (\n"
-        + "'connector' = 'print'\n"
-        + ");\n"
-        + "\n"
-        + "INSERT INTO print_table select f_sequence,f_random,f_random_str 
from datagen;";
 }

Reply via email to