This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 942116bd2 add feature 3861 (#3890)
942116bd2 is described below
commit 942116bd25e2e226bf3b71edb4c1ce8147ec1ba3
Author: xiangzihao <[email protected]>
AuthorDate: Fri Jul 19 12:29:46 2024 +0800
add feature 3861 (#3890)
---
.../src/views/flink/app/hooks/useFlinkRender.tsx | 1 +
.../e2e/cases/FlinkSQL116OnYarnTest.java | 123 ++++++++++++++---
.../e2e/cases/FlinkSQL117OnYarnTest.java | 150 ++++++++++++++++++---
.../e2e/cases/FlinkSQL118OnYarnTest.java | 150 ++++++++++++++++++---
.../streampark/e2e/pages/common/Constants.java | 32 +++++
.../pages/flink/applications/ApplicationForm.java | 68 ++++++++--
.../pages/flink/applications/FlinkSQLEditor.java | 75 ++++++++++-
.../applications/FlinkSQLYarnApplicationForm.java | 69 ----------
.../entity/ApplicationsDynamicParams.java | 26 ----
.../pages/flink/clusters/FlinkClustersPage.java | 4 +-
.../flink-1.16-on-yarn/docker-compose.config | 1 +
.../flink-1.17-on-yarn/docker-compose.config | 1 +
.../flink-1.18-on-yarn/docker-compose.config | 1 +
.../org/apache/streampark/e2e/core/Constants.java | 27 ----
14 files changed, 541 insertions(+), 187 deletions(-)
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
index 62975b584..79105228b 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
@@ -274,6 +274,7 @@ export const renderFlinkCluster = (clusters, { model, field
}: RenderCallbackPar
placeholder={t('flink.app.flinkCluster')}
value={model[field]}
onChange={(value: any) => (model[field] = value)}
+ codeField={field}
>
{clusters.map((item) => {
return (
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
index 92d4c5dae..9fd649ff0 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
@@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
-import
org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
@@ -34,7 +36,7 @@ import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.testcontainers.shaded.org.awaitility.Awaitility;
-import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
+import static org.apache.streampark.e2e.pages.common.Constants.TEST_FLINK_SQL;
import static org.assertj.core.api.Assertions.assertThat;
@StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml")
@@ -54,6 +56,10 @@ public class FlinkSQL116OnYarnTest {
private static final String applicationName = "flink-116-e2e-test";
+ private static final String flinkDescription = "description test";
+
+ private static final String flinkClusterName = "flink_1.16.3_cluster_e2e";
+
@BeforeAll
public static void setup() {
FlinkHomePage flinkHomePage = new LoginPage(browser)
@@ -61,9 +67,22 @@ public class FlinkSQL116OnYarnTest {
.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkHomePage.class);
- flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+ FlinkClustersPage flinkClustersPage =
flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkClustersPage.class);
+
+ flinkClustersPage.createFlinkCluster()
+
.<YarnSessionForm>addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION)
+ .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+ .clusterName(flinkClusterName)
+ .flinkVersion(flinkName)
+ .submit();
+
+ flinkClustersPage.startFlinkCluster(flinkClusterName);
-
flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class);
+ flinkClustersPage.goToNav(ApacheFlinkPage.class)
+ .goToTab(ApplicationsPage.class);
}
@Test
@@ -71,17 +90,15 @@ public class FlinkSQL116OnYarnTest {
void testCreateFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
- ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
-
- applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_APPLICATION,
- applicationName,
- flinkName,
- applicationsDynamicParams);
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(TEST_FLINK_SQL)
+ .submit();
Awaitility.await()
.untilAsserted(
@@ -176,16 +193,15 @@ public class FlinkSQL116OnYarnTest {
void testCreateFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
- ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
- applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_PER_JOB,
- applicationName,
- flinkName,
- applicationsDynamicParams);
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(TEST_FLINK_SQL)
+ .submit();
Awaitility.await()
.untilAsserted(
@@ -248,4 +264,81 @@ public class FlinkSQL116OnYarnTest {
.noneMatch(it ->
it.getText().contains(applicationName));
});
}
+
+ @Test
+ @Order(90)
+ void testCreateFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.createApplication()
+ .addApplication(
+ ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_SESSION,
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(TEST_FLINK_SQL)
+ .flinkCluster(flinkClusterName)
+ .submit();
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(100)
+ void testReleaseFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(110)
+ void testStartFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain started application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain finished
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(120)
+ void testDeleteFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> {
+ browser.navigate().refresh();
+
+ assertThat(applicationsPage.applicationsList())
+ .noneMatch(it ->
it.getText().contains(applicationName));
+ });
+ }
}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
index b98fae390..f636a94c8 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
@@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
-import
org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
@@ -34,7 +36,6 @@ import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.testcontainers.shaded.org.awaitility.Awaitility;
-import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
import static org.assertj.core.api.Assertions.assertThat;
@StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml")
@@ -54,6 +55,10 @@ public class FlinkSQL117OnYarnTest {
private static final String applicationName = "flink-117-e2e-test";
+ private static final String flinkDescription = "description test";
+
+ private static final String flinkClusterName = "flink_1.17.2_cluster_e2e";
+
@BeforeAll
public static void setup() {
FlinkHomePage flinkHomePage = new LoginPage(browser)
@@ -61,9 +66,22 @@ public class FlinkSQL117OnYarnTest {
.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkHomePage.class);
- flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+ FlinkClustersPage flinkClustersPage =
flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkClustersPage.class);
-
flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class);
+ flinkClustersPage.createFlinkCluster()
+
.<YarnSessionForm>addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION)
+ .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+ .clusterName(flinkClusterName)
+ .flinkVersion(flinkName)
+ .submit();
+
+ flinkClustersPage.startFlinkCluster(flinkClusterName);
+
+ flinkClustersPage.goToNav(ApacheFlinkPage.class)
+ .goToTab(ApplicationsPage.class);
}
@Test
@@ -71,17 +89,15 @@ public class FlinkSQL117OnYarnTest {
void testCreateFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
- ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
-
- applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_APPLICATION,
- applicationName,
- flinkName,
- applicationsDynamicParams);
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(Constants.TEST_FLINK_SQL)
+ .submit();
Awaitility.await()
.untilAsserted(
@@ -176,17 +192,15 @@ public class FlinkSQL117OnYarnTest {
void testCreateFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
- ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
-
- applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_PER_JOB,
- applicationName,
- flinkName,
- applicationsDynamicParams);
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(Constants.TEST_FLINK_SQL)
+ .submit();
Awaitility.await()
.untilAsserted(
@@ -249,4 +263,108 @@ public class FlinkSQL117OnYarnTest {
.noneMatch(it ->
it.getText().contains(applicationName));
});
}
+
+ @Test
+ @Order(90)
+ void testCreateFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage
+ .createApplication()
+ .addApplication(
+ ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_SESSION,
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(Constants.TEST_FLINK_SQL)
+ .flinkCluster(flinkClusterName)
+ .submit();
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(100)
+ void testReleaseFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(110)
+ void testStartFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain started application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain finished
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(120)
+ @SneakyThrows
+ void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
+ Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain restarted
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ applicationsPage.cancelApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain canceled
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("CANCELED")));
+ }
+
+ @Test
+ @Order(130)
+ void testDeleteFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> {
+ browser.navigate().refresh();
+
+ assertThat(applicationsPage.applicationsList())
+ .noneMatch(it ->
it.getText().contains(applicationName));
+ });
+ }
}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
index a91da141a..89367e8c5 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
@@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
-import
org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
@@ -34,7 +36,6 @@ import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.testcontainers.shaded.org.awaitility.Awaitility;
-import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
import static org.assertj.core.api.Assertions.assertThat;
@StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
@@ -54,6 +55,10 @@ public class FlinkSQL118OnYarnTest {
private static final String applicationName = "flink-118-e2e-test";
+ private static final String flinkDescription = "description test";
+
+ private static final String flinkClusterName = "flink_1.18.1_cluster_e2e";
+
@BeforeAll
public static void setup() {
FlinkHomePage flinkHomePage = new LoginPage(browser)
@@ -61,9 +66,22 @@ public class FlinkSQL118OnYarnTest {
.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkHomePage.class);
- flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+ FlinkClustersPage flinkClustersPage =
flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkClustersPage.class);
-
flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class);
+ flinkClustersPage.createFlinkCluster()
+
.<YarnSessionForm>addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION)
+ .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+ .clusterName(flinkClusterName)
+ .flinkVersion(flinkName)
+ .submit();
+
+ flinkClustersPage.startFlinkCluster(flinkClusterName);
+
+ flinkClustersPage.goToNav(ApacheFlinkPage.class)
+ .goToTab(ApplicationsPage.class);
}
@Test
@@ -71,17 +89,15 @@ public class FlinkSQL118OnYarnTest {
void testCreateFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
- ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
-
- applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_APPLICATION,
- applicationName,
- flinkName,
- applicationsDynamicParams);
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(Constants.TEST_FLINK_SQL)
+ .submit();
Awaitility.await()
.untilAsserted(
@@ -176,17 +192,15 @@ public class FlinkSQL118OnYarnTest {
void testCreateFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
- ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
-
- applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_PER_JOB,
- applicationName,
- flinkName,
- applicationsDynamicParams);
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(Constants.TEST_FLINK_SQL)
+ .submit();
Awaitility.await()
.untilAsserted(
@@ -275,4 +289,108 @@ public class FlinkSQL118OnYarnTest {
.noneMatch(it ->
it.getText().contains(applicationName));
});
}
+
+ @Test
+ @Order(90)
+ void testCreateFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage
+ .createApplication()
+ .addApplication(
+ ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_SESSION,
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(Constants.TEST_FLINK_SQL)
+ .flinkCluster(flinkClusterName)
+ .submit();
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(100)
+ void testReleaseFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(110)
+ void testStartFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain started application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain finished
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(120)
+ @SneakyThrows
+ void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
+ Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain restarted
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ applicationsPage.cancelApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain canceled
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("CANCELED")));
+ }
+
+ @Test
+ @Order(130)
+ void testDeleteFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> {
+ browser.navigate().refresh();
+
+ assertThat(applicationsPage.applicationsList())
+ .noneMatch(it ->
it.getText().contains(applicationName));
+ });
+ }
}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java
index 7319fc283..956556cee 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java
@@ -26,7 +26,39 @@ public class Constants {
public static final Integer DEFAULT_SLEEP_MILLISECONDS = 2000;
+ public static final Integer DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS =
1000;
+
public static final Integer DEFAULT_PROJECT_BUILD_TIMEOUT_MINUTES = 5;
public static final Duration DEFAULT_WEBDRIVER_WAIT_DURATION =
Duration.ofSeconds(10);
+
+ public static final String LINE_SEPARATOR = "\n";
+
+ /** datagen flink sql for test */
+ public static final String TEST_FLINK_SQL = "CREATE TABLE datagen (\n"
+ + "f_sequence INT,\n"
+ + "f_random INT,\n"
+ + "f_random_str STRING,\n"
+ + "ts AS localtimestamp,\n"
+ + "WATERMARK FOR ts AS ts\n"
+ + ") WITH (\n"
+ + "'connector' = 'datagen',\n"
+ + "'rows-per-second'='5',\n"
+ + "'fields.f_sequence.kind'='sequence',\n"
+ + "'fields.f_sequence.start'='1',\n"
+ + "'fields.f_sequence.end'='100',\n"
+ + "'fields.f_random.min'='1',\n"
+ + "'fields.f_random.max'='100',\n"
+ + "'fields.f_random_str.length'='10'\n"
+ + ");\n"
+ + "\n"
+ + "CREATE TABLE print_table (\n"
+ + "f_sequence INT,\n"
+ + "f_random INT,\n"
+ + "f_random_str STRING\n"
+ + ") WITH (\n"
+ + "'connector' = 'print'\n"
+ + ");\n"
+ + "\n"
+ + "INSERT INTO print_table select f_sequence,f_random,f_random_str
from datagen;";
}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
index 1f977301f..eb93e3cb7 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
@@ -18,12 +18,12 @@
package org.apache.streampark.e2e.pages.flink.applications;
import org.apache.streampark.e2e.pages.common.Constants;
-import
org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
import lombok.Getter;
import lombok.SneakyThrows;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
+import org.openqa.selenium.interactions.Actions;
import org.openqa.selenium.support.FindBy;
import org.openqa.selenium.support.FindBys;
import org.openqa.selenium.support.PageFactory;
@@ -58,6 +58,21 @@ public final class ApplicationForm {
@FindBy(id = "form_item_jobName")
private WebElement inputApplicationName;
+ @FindBy(xpath = "//div[contains(@codefield,
'yarnSessionClusterId')]//div[contains(@class, 'ant-select-selector')]")
+ private WebElement buttonFlinkClusterDropdown;
+
+ @FindBy(className = "ant-select-item-option-content")
+ private List<WebElement> selectFlinkCluster;
+
+ @FindBy(xpath = "//div[contains(@codefield,
'versionId')]//div[contains(@class, 'ant-select-selector')]")
+ private WebElement buttonFlinkVersionDropdown;
+
+ @FindBys({
+ @FindBy(css = "[codefield=versionId]"),
+ @FindBy(className = "ant-select-item-option-content")
+ })
+ private List<WebElement> selectFlinkVersion;
+
@FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(text(), 'Submit')]")
private WebElement buttonSubmit;
@@ -71,12 +86,9 @@ public final class ApplicationForm {
}
@SneakyThrows
- public ApplicationForm addApplication(
- DevelopmentMode developmentMode,
+ public ApplicationForm addApplication(DevelopmentMode developmentMode,
ExecutionMode executionMode,
- String applicationName,
- String flinkVersion,
- ApplicationsDynamicParams
applicationsDynamicParams) {
+ String applicationName) {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
.until(ExpectedConditions.elementToBeClickable(buttonDevelopmentModeDropdown));
@@ -125,8 +137,7 @@ public final class ApplicationForm {
String.format("Execution mode not found:
%s",
executionMode.desc())))
.click();
- new FlinkSQLYarnApplicationForm(driver)
- .add(flinkVersion,
applicationsDynamicParams.flinkSQL());
+
break;
case YARN_SESSION:
selectExecutionMode.stream()
@@ -170,8 +181,6 @@ public final class ApplicationForm {
String.format("Execution mode not found:
%s",
executionMode.desc())))
.click();
- new FlinkSQLYarnApplicationForm(driver)
- .add(flinkVersion,
applicationsDynamicParams.flinkSQL());
break;
default:
throw new IllegalArgumentException(
@@ -194,6 +203,45 @@ public final class ApplicationForm {
}
inputApplicationName.sendKeys(applicationName);
+ return this;
+ }
+
+ public ApplicationForm flinkVersion(String flinkVersion) {
+ new
Actions(driver).moveToElement(buttonFlinkVersionDropdown).build().perform();
+ buttonFlinkVersionDropdown.click();
+ new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
+
.until(ExpectedConditions.visibilityOfAllElements(selectFlinkVersion));
+ selectFlinkVersion.stream()
+ .filter(e -> e.getText().equalsIgnoreCase(flinkVersion))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("Flink version not
found"))
+ .click();
+
+ return this;
+ }
+
+ public ApplicationForm flinkSql(String flinkSql) {
+ new FlinkSQLEditor(driver).content(flinkSql);
+ return this;
+ }
+
+ @SneakyThrows
+ public ApplicationForm flinkCluster(String flinkClusterName) {
+ new
Actions(driver).moveToElement(buttonFlinkClusterDropdown).build().perform();
+ buttonFlinkClusterDropdown.click();
+ Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+ selectFlinkCluster.stream()
+ .filter(e -> e.getText().contains(flinkClusterName))
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalArgumentException(
+ String.format("Flink cluster not found: %s",
flinkClusterName)))
+ .click();
+
+ return this;
+ }
+
+ public ApplicationForm submit() {
buttonSubmit.click();
return this;
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java
index c40901187..75d5fdd10 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java
@@ -20,6 +20,10 @@ package org.apache.streampark.e2e.pages.flink.applications;
import org.apache.streampark.e2e.pages.common.Constants;
import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.platform.commons.util.StringUtils;
+import org.openqa.selenium.Keys;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.interactions.Actions;
@@ -28,11 +32,14 @@ import org.openqa.selenium.support.PageFactory;
import org.openqa.selenium.support.ui.ExpectedConditions;
import org.openqa.selenium.support.ui.WebDriverWait;
+import java.util.List;
+
@Getter
+@Slf4j
public final class FlinkSQLEditor {
- @FindBy(xpath = "//label[contains(@for,
'form_item_flinkSql')]/../..//div[contains(@class,
'monaco-editor')]//div[contains(@class, 'view-line')]")
- private WebElement flinkSqlEditor;
+ @FindBy(xpath = "//label[contains(@for,
'form_item_flinkSql')]/../..//div[contains(@class,
'monaco-editor')]//div[contains(@class, 'view-line') and not(contains(@class,
'view-lines'))]")
+ private List<WebElement> flinkSqlEditor;
private WebDriver driver;
@@ -41,15 +48,69 @@ public final class FlinkSQLEditor {
this.driver = driver;
}
+ @SneakyThrows
public FlinkSQLEditor content(String content) {
- new WebDriverWait(this.driver,
Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
- .until(ExpectedConditions.elementToBeClickable(flinkSqlEditor));
-
- flinkSqlEditor.click();
+ new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
+
.until(ExpectedConditions.elementToBeClickable(flinkSqlEditor.get(0)));
Actions actions = new Actions(this.driver);
- actions.moveToElement(flinkSqlEditor).sendKeys(content).perform();
+
+ List<String> contentList =
List.of(content.split(Constants.LINE_SEPARATOR));
+
+ for (int i = 0; i < contentList.size(); i++) {
+ String editorLineText;
+ String inputContent = contentList.get(i);
+ int flinkSqlEditorIndex = Math.min(i, 21);
+
+ if (i == 0) {
+ actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex))
+ .click()
+ .sendKeys(inputContent)
+ .sendKeys(Constants.LINE_SEPARATOR)
+ .perform();
+ continue;
+ } else {
+ editorLineText =
flinkSqlEditor.get(flinkSqlEditorIndex).getText();
+ }
+
+ if (StringUtils.isNotBlank(inputContent)) {
+ if (editorLineText.isEmpty()) {
+
actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex))
+ .click()
+ .sendKeys(inputContent)
+ .sendKeys(Constants.LINE_SEPARATOR)
+ .perform();
+
Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS);
+ } else {
+ for (int p = 0; p < editorLineText.strip().length(); p++) {
+ clearLine(actions,
flinkSqlEditor.get(flinkSqlEditorIndex));
+ }
+ if (!editorLineText.isEmpty()) {
+ clearLine(actions,
flinkSqlEditor.get(flinkSqlEditorIndex));
+ }
+
actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex))
+ .click()
+ .sendKeys(inputContent)
+ .sendKeys(Constants.LINE_SEPARATOR)
+ .perform();
+
Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS);
+ }
+ } else {
+ actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex))
+ .click()
+ .sendKeys(Constants.LINE_SEPARATOR)
+ .perform();
+
Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS);
+ }
+ }
return this;
}
+
+ private void clearLine(Actions actions, WebElement element) {
+ actions.moveToElement(element)
+ .click()
+ .sendKeys(Keys.BACK_SPACE)
+ .perform();
+ }
}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java
deleted file mode 100644
index d418b9688..000000000
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.e2e.pages.flink.applications;
-
-import org.apache.streampark.e2e.pages.common.Constants;
-
-import lombok.Getter;
-import lombok.SneakyThrows;
-import org.openqa.selenium.WebDriver;
-import org.openqa.selenium.WebElement;
-import org.openqa.selenium.support.FindBy;
-import org.openqa.selenium.support.FindBys;
-import org.openqa.selenium.support.PageFactory;
-import org.openqa.selenium.support.ui.ExpectedConditions;
-import org.openqa.selenium.support.ui.WebDriverWait;
-
-import java.util.List;
-
-@Getter
-public final class FlinkSQLYarnApplicationForm {
-
- private WebDriver driver;
-
- @FindBy(xpath = "//div[contains(@codefield,
'versionId')]//div[contains(@class, 'ant-select-selector')]")
- private WebElement buttonFlinkVersionDropdown;
-
- @FindBys({
- @FindBy(css = "[codefield=versionId]"),
- @FindBy(className = "ant-select-item-option-content")
- })
- private List<WebElement> selectFlinkVersion;
-
- public FlinkSQLYarnApplicationForm(WebDriver driver) {
- this.driver = driver;
-
- PageFactory.initElements(driver, this);
- }
-
- @SneakyThrows
- public FlinkSQLYarnApplicationForm add(String flinkVersion, String
flinkSql) {
- buttonFlinkVersionDropdown.click();
- new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
-
.until(ExpectedConditions.visibilityOfAllElements(selectFlinkVersion));
- selectFlinkVersion.stream()
- .filter(e -> e.getText().equalsIgnoreCase(flinkVersion))
- .findFirst()
- .orElseThrow(() -> new IllegalArgumentException("Flink version not
found"))
- .click();
-
- new FlinkSQLEditor(driver).content(flinkSql);
-
- return this;
- }
-}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java
deleted file mode 100644
index 5dffe13a3..000000000
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.e2e.pages.flink.applications.entity;
-
-import lombok.Data;
-
-@Data
-public class ApplicationsDynamicParams {
-
- private String flinkSQL;
-}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java
index 977f00a63..c5793ecb5 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java
@@ -22,6 +22,7 @@ import org.apache.streampark.e2e.pages.common.NavBarPage;
import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
import lombok.Getter;
+import lombok.SneakyThrows;
import org.openqa.selenium.By;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
@@ -83,9 +84,10 @@ public class FlinkClustersPage extends NavBarPage implements
ApacheFlinkPage.Tab
return new ClusterDetailForm(driver);
}
+ @SneakyThrows
public FlinkClustersPage startFlinkCluster(String flinkClusterName) {
waitForPageLoading();
-
+ Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
flinkClusterList().stream()
.filter(it -> it.getText().contains(flinkClusterName))
.flatMap(
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
index 04a1ae239..441fa1118 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
@@ -35,6 +35,7 @@
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
index 04a1ae239..441fa1118 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
@@ -35,6 +35,7 @@
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
index 04a1ae239..441fa1118 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
@@ -35,6 +35,7 @@
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
diff --git
a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java
b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java
index e18f3828d..3d57754c2 100644
---
a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java
+++
b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java
@@ -34,31 +34,4 @@ public final class Constants {
/** chrome download path in selenium/standalone-chrome-debug container */
public static final String SELENIUM_CONTAINER_CHROME_DOWNLOAD_PATH =
"/home/seluser/Downloads";
- /** datagen flink sql for test */
- public static final String TEST_FLINK_SQL = "CREATE TABLE datagen (\n"
- + "f_sequence INT,\n"
- + "f_random INT,\n"
- + "f_random_str STRING,\n"
- + "ts AS localtimestamp,\n"
- + "WATERMARK FOR ts AS ts\n"
- + ") WITH (\n"
- + "'connector' = 'datagen',\n"
- + "'rows-per-second'='5',\n"
- + "'fields.f_sequence.kind'='sequence',\n"
- + "'fields.f_sequence.start'='1',\n"
- + "'fields.f_sequence.end'='100',\n"
- + "'fields.f_random.min'='1',\n"
- + "'fields.f_random.max'='100',\n"
- + "'fields.f_random_str.length'='10'\n"
- + ");\n"
- + "\n"
- + "CREATE TABLE print_table (\n"
- + "f_sequence INT,\n"
- + "f_random INT,\n"
- + "f_random_str STRING\n"
- + ") WITH (\n"
- + "'connector' = 'print'\n"
- + ");\n"
- + "\n"
- + "INSERT INTO print_table select f_sequence,f_random,f_random_str
from datagen;";
}