This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 56d872412 [Feature-3754] Add Flink on Yarn per job mode and Flink on
Yarn yarn application test cases (#3760)
56d872412 is described below
commit 56d8724128dd624447368b3a4191842cfebcd655
Author: xiangzihao <[email protected]>
AuthorDate: Sun Jun 16 11:27:01 2024 +0800
[Feature-3754] Add Flink on Yarn per job mode and Flink on Yarn yarn
application test cases (#3760)
* add feature 3754
---
.github/workflows/e2e.yml | 10 +-
.../e2e/cases/ApplicationsFlink116OnYarnTest.java | 258 +++++++++++++++++++++
.../e2e/cases/ApplicationsFlink117OnYarnTest.java | 258 +++++++++++++++++++++
.../e2e/cases/ApplicationsFlink118OnYarnTest.java | 258 +++++++++++++++++++++
.../apache/streampark/e2e/cases/FlinkHomeTest.java | 101 ++++++++
.../e2e/pages/apacheflink/ApacheFlinkPage.java | 68 ++++++
.../e2e/pages/apacheflink/FlinkHomePage.java | 137 +++++++++++
.../apacheflink/applications/ApplicationForm.java | 204 ++++++++++++++++
.../apacheflink/applications/ApplicationsPage.java | 180 ++++++++++++++
.../apacheflink/applications/FlinkSQLEditor.java | 55 +++++
.../applications/FlinkSQLYarnApplicationForm.java | 68 ++++++
.../entity/ApplicationsDynamicParams.java | 27 +++
.../streampark/e2e/pages/common/NavBarPage.java | 18 +-
.../resources/docker/flink-1.16-on-yarn/Dockerfile | 32 +++
.../flink-1.16-on-yarn/docker-compose.config | 42 ++++
.../docker/flink-1.16-on-yarn/docker-compose.yaml | 156 +++++++++++++
.../resources/docker/flink-1.17-on-yarn/Dockerfile | 32 +++
.../flink-1.17-on-yarn/docker-compose.config | 42 ++++
.../docker/flink-1.17-on-yarn/docker-compose.yaml | 156 +++++++++++++
.../resources/docker/flink-1.18-on-yarn/Dockerfile | 32 +++
.../flink-1.18-on-yarn/docker-compose.config | 42 ++++
.../docker/flink-1.18-on-yarn/docker-compose.yaml | 156 +++++++++++++
.../streampark/e2e/core/StreamParkExtension.java | 2 +-
23 files changed, 2329 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index bb7395543..7601cdc1b 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -97,16 +97,22 @@ jobs:
class: org.apache.streampark.e2e.cases.UserManagementTest
- name: TeamManagementTest
class: org.apache.streampark.e2e.cases.TeamManagementTest
+ - name: ApplicationsFlink116OnYarnTest
+ class:
org.apache.streampark.e2e.cases.ApplicationsFlink116OnYarnTest
+ - name: ApplicationsFlink117OnYarnTest
+ class:
org.apache.streampark.e2e.cases.ApplicationsFlink117OnYarnTest
+ - name: ApplicationsFlink118OnYarnTest
+ class:
org.apache.streampark.e2e.cases.ApplicationsFlink118OnYarnTest
env:
RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
steps:
- uses: actions/checkout@v4
with:
submodules: true
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v4
with:
- java-version: 8
+ java-version: 11
distribution: 'adopt'
- name: Cache local Maven repository
uses: actions/cache@v4
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnTest.java
new file mode 100644
index 000000000..ea63b2096
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.cases;
+
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.apacheflink.FlinkHomePage;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationForm;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationsPage;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.entity.ApplicationsDynamicParams;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml")
+public class ApplicationsFlink116OnYarnTest {
+ private static RemoteWebDriver browser;
+
+ private static final String userName = "admin";
+
+ private static final String password = "streampark";
+
+ private static final String teamName = "default";
+
+ private static final String flinkName = "flink-1.16.3";
+
+ private static final String flinkHome = "/flink-1.16.3";
+
+ private static final String applicationName = "flink-116-e2e-test";
+
+ @BeforeAll
+ public static void setup() {
+ FlinkHomePage flinkHomePage = new LoginPage(browser)
+ .login(userName, password, teamName)
+ .goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkHomePage.class);
+
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+
+ flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(ApplicationsPage.class);
+ }
+
+ @Test
+ @Order(10)
+ void testCreateFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
+ String flinkSQL = "CREATE TABLE datagen (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING,\n" +
+ "ts AS localtimestamp,\n" +
+ "WATERMARK FOR ts AS ts\n" +
+ ") WITH (\n" +
+ "'connector' = 'datagen',\n" +
+ "'rows-per-second'='5',\n" +
+ "'fields.f_sequence.kind'='sequence',\n" +
+ "'fields.f_sequence.start'='1',\n" +
+ "'fields.f_sequence.end'='100',\n" +
+ "'fields.f_random.min'='1',\n" +
+ "'fields.f_random.max'='100',\n" +
+ "'fields.f_random_str.length'='10'\n" +
+ ");\n" +
+ "\n" +
+ "CREATE TABLE print_table (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING\n" +
+ ") WITH (\n" +
+ "'connector' = 'print'\n" +
+ ");\n" +
+ "\n" +
+ "INSERT INTO print_table select f_sequence,f_random,f_random_str
from datagen;";
+ applicationsDynamicParams.flinkSQL(flinkSQL);
+
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_APPLICATION,
+ applicationName,
+ flinkName,
+ applicationsDynamicParams);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(20)
+ void testReleaseFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(30)
+ void testStartFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain started application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ Awaitility.await()
+ .untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain finished application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(40)
+ void testDeleteFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() -> {
+ browser.navigate().refresh();
+
+ assertThat(
+ applicationsPage.applicationsList()
+ ).noneMatch(
+ it -> it.getText().contains(applicationName)
+ );
+ });
+ }
+
+ @Test
+ @Order(50)
+ void testCreateFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
+ String flinkSQL = "CREATE TABLE datagen (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING,\n" +
+ "ts AS localtimestamp,\n" +
+ "WATERMARK FOR ts AS ts\n" +
+ ") WITH (\n" +
+ "'connector' = 'datagen',\n" +
+ "'rows-per-second'='5',\n" +
+ "'fields.f_sequence.kind'='sequence',\n" +
+ "'fields.f_sequence.start'='1',\n" +
+ "'fields.f_sequence.end'='100',\n" +
+ "'fields.f_random.min'='1',\n" +
+ "'fields.f_random.max'='100',\n" +
+ "'fields.f_random_str.length'='10'\n" +
+ ");\n" +
+ "\n" +
+ "CREATE TABLE print_table (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING\n" +
+ ") WITH (\n" +
+ "'connector' = 'print'\n" +
+ ");\n" +
+ "\n" +
+ "INSERT INTO print_table select f_sequence,f_random,f_random_str
from datagen;";
+ applicationsDynamicParams.flinkSQL(flinkSQL);
+
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_PER_JOB,
+ applicationName,
+ flinkName,
+ applicationsDynamicParams);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(60)
+ void testReleaseFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+// This test cannot be executed due to a bug, and will be put online after
issue #3761 fixed
+// @Test
+// @Order(70)
+// void testStartFlinkApplicationOnYarnPerJobMode() {
+// final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+//
+// applicationsPage.startApplication(applicationName);
+//
+// Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+// .as("Applications list should contain started application")
+// .extracting(WebElement::getText)
+// .anyMatch(it -> it.contains("RUNNING")));
+//
+// Awaitility.await()
+// .untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+// .as("Applications list should contain finished application")
+// .extracting(WebElement::getText)
+// .anyMatch(it -> it.contains("FINISHED")));
+// }
+
+ @Test
+ @Order(80)
+ void testDeleteFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() -> {
+ browser.navigate().refresh();
+
+ assertThat(
+ applicationsPage.applicationsList()
+ ).noneMatch(
+ it -> it.getText().contains(applicationName)
+ );
+ });
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnTest.java
new file mode 100644
index 000000000..ef9905ab5
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.cases;
+
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.apacheflink.FlinkHomePage;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationForm;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationsPage;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.entity.ApplicationsDynamicParams;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml")
+public class ApplicationsFlink117OnYarnTest {
+ private static RemoteWebDriver browser;
+
+ private static final String userName = "admin";
+
+ private static final String password = "streampark";
+
+ private static final String teamName = "default";
+
+ private static final String flinkName = "flink-1.17.2";
+
+ private static final String flinkHome = "/flink-1.17.2";
+
+ private static final String applicationName = "flink-117-e2e-test";
+
+ @BeforeAll
+ public static void setup() {
+ FlinkHomePage flinkHomePage = new LoginPage(browser)
+ .login(userName, password, teamName)
+ .goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkHomePage.class);
+
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+
+ flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(ApplicationsPage.class);
+ }
+
+ @Test
+ @Order(10)
+ void testCreateFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
+ String flinkSQL = "CREATE TABLE datagen (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING,\n" +
+ "ts AS localtimestamp,\n" +
+ "WATERMARK FOR ts AS ts\n" +
+ ") WITH (\n" +
+ "'connector' = 'datagen',\n" +
+ "'rows-per-second'='5',\n" +
+ "'fields.f_sequence.kind'='sequence',\n" +
+ "'fields.f_sequence.start'='1',\n" +
+ "'fields.f_sequence.end'='100',\n" +
+ "'fields.f_random.min'='1',\n" +
+ "'fields.f_random.max'='100',\n" +
+ "'fields.f_random_str.length'='10'\n" +
+ ");\n" +
+ "\n" +
+ "CREATE TABLE print_table (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING\n" +
+ ") WITH (\n" +
+ "'connector' = 'print'\n" +
+ ");\n" +
+ "\n" +
+ "INSERT INTO print_table select f_sequence,f_random,f_random_str
from datagen;";
+ applicationsDynamicParams.flinkSQL(flinkSQL);
+
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_APPLICATION,
+ applicationName,
+ flinkName,
+ applicationsDynamicParams);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(20)
+ void testReleaseFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(30)
+ void testStartFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain started application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ Awaitility.await()
+ .untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain finished application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(40)
+ void testDeleteFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() -> {
+ browser.navigate().refresh();
+
+ assertThat(
+ applicationsPage.applicationsList()
+ ).noneMatch(
+ it -> it.getText().contains(applicationName)
+ );
+ });
+ }
+
+ @Test
+ @Order(50)
+ void testCreateFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
+ String flinkSQL = "CREATE TABLE datagen (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING,\n" +
+ "ts AS localtimestamp,\n" +
+ "WATERMARK FOR ts AS ts\n" +
+ ") WITH (\n" +
+ "'connector' = 'datagen',\n" +
+ "'rows-per-second'='5',\n" +
+ "'fields.f_sequence.kind'='sequence',\n" +
+ "'fields.f_sequence.start'='1',\n" +
+ "'fields.f_sequence.end'='100',\n" +
+ "'fields.f_random.min'='1',\n" +
+ "'fields.f_random.max'='100',\n" +
+ "'fields.f_random_str.length'='10'\n" +
+ ");\n" +
+ "\n" +
+ "CREATE TABLE print_table (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING\n" +
+ ") WITH (\n" +
+ "'connector' = 'print'\n" +
+ ");\n" +
+ "\n" +
+ "INSERT INTO print_table select f_sequence,f_random,f_random_str
from datagen;";
+ applicationsDynamicParams.flinkSQL(flinkSQL);
+
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_PER_JOB,
+ applicationName,
+ flinkName,
+ applicationsDynamicParams);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(60)
+ void testReleaseFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+// This test cannot be executed due to a bug, and will be put online after
issue #3761 fixed
+// @Test
+// @Order(70)
+// void testStartFlinkApplicationOnYarnPerJobMode() {
+// final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+//
+// applicationsPage.startApplication(applicationName);
+//
+// Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+// .as("Applications list should contain started application")
+// .extracting(WebElement::getText)
+// .anyMatch(it -> it.contains("RUNNING")));
+//
+// Awaitility.await()
+// .untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+// .as("Applications list should contain finished application")
+// .extracting(WebElement::getText)
+// .anyMatch(it -> it.contains("FINISHED")));
+// }
+
+ @Test
+ @Order(80)
+ void testDeleteFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() -> {
+ browser.navigate().refresh();
+
+ assertThat(
+ applicationsPage.applicationsList()
+ ).noneMatch(
+ it -> it.getText().contains(applicationName)
+ );
+ });
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnTest.java
new file mode 100644
index 000000000..ebd3bd6e7
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.cases;
+
+
+import lombok.SneakyThrows;
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.apacheflink.FlinkHomePage;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationForm;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationsPage;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.entity.ApplicationsDynamicParams;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
+public class ApplicationsFlink118OnYarnTest {
+ private static RemoteWebDriver browser;
+
+ private static final String userName = "admin";
+
+ private static final String password = "streampark";
+
+ private static final String teamName = "default";
+
+ private static final String flinkName = "flink-1.18.1";
+
+ private static final String flinkHome = "/flink-1.18.1";
+
+ private static final String applicationName = "flink-118-e2e-test";
+
+ @BeforeAll
+ public static void setup() {
+ FlinkHomePage flinkHomePage = new LoginPage(browser)
+ .login(userName, password, teamName)
+ .goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkHomePage.class);
+
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
+
+ flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(ApplicationsPage.class);
+ }
+
+ @Test
+ @Order(10)
+ void testCreateFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
+ String flinkSQL = "CREATE TABLE datagen (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING,\n" +
+ "ts AS localtimestamp,\n" +
+ "WATERMARK FOR ts AS ts\n" +
+ ") WITH (\n" +
+ "'connector' = 'datagen',\n" +
+ "'rows-per-second'='5',\n" +
+ "'fields.f_sequence.kind'='sequence',\n" +
+ "'fields.f_sequence.start'='1',\n" +
+ "'fields.f_sequence.end'='100',\n" +
+ "'fields.f_random.min'='1',\n" +
+ "'fields.f_random.max'='100',\n" +
+ "'fields.f_random_str.length'='10'\n" +
+ ");\n" +
+ "\n" +
+ "CREATE TABLE print_table (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING\n" +
+ ") WITH (\n" +
+ "'connector' = 'print'\n" +
+ ");\n" +
+ "\n" +
+ "INSERT INTO print_table select f_sequence,f_random,f_random_str
from datagen;";
+ applicationsDynamicParams.flinkSQL(flinkSQL);
+
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_APPLICATION,
+ applicationName,
+ flinkName,
+ applicationsDynamicParams);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(20)
+ void testReleaseFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(30)
+ void testStartFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain started application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ Awaitility.await()
+ .untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain finished application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(40)
+ void testDeleteFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() -> {
+ browser.navigate().refresh();
+
+ assertThat(
+ applicationsPage.applicationsList()
+ ).noneMatch(
+ it -> it.getText().contains(applicationName)
+ );
+ });
+ }
+
+ @Test
+ @Order(50)
+ void testCreateFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ ApplicationsDynamicParams applicationsDynamicParams = new
ApplicationsDynamicParams();
+ String flinkSQL = "CREATE TABLE datagen (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING,\n" +
+ "ts AS localtimestamp,\n" +
+ "WATERMARK FOR ts AS ts\n" +
+ ") WITH (\n" +
+ "'connector' = 'datagen',\n" +
+ "'rows-per-second'='5',\n" +
+ "'fields.f_sequence.kind'='sequence',\n" +
+ "'fields.f_sequence.start'='1',\n" +
+ "'fields.f_sequence.end'='100',\n" +
+ "'fields.f_random.min'='1',\n" +
+ "'fields.f_random.max'='100',\n" +
+ "'fields.f_random_str.length'='10'\n" +
+ ");\n" +
+ "\n" +
+ "CREATE TABLE print_table (\n" +
+ "f_sequence INT,\n" +
+ "f_random INT,\n" +
+ "f_random_str STRING\n" +
+ ") WITH (\n" +
+ "'connector' = 'print'\n" +
+ ");\n" +
+ "\n" +
+ "INSERT INTO print_table select f_sequence,f_random,f_random_str
from datagen;";
+ applicationsDynamicParams.flinkSQL(flinkSQL);
+
applicationsPage.createApplication().addApplication(ApplicationForm.DevelopmentMode.FLINK_SQL,
+ ApplicationForm.ExecutionMode.YARN_PER_JOB,
+ applicationName,
+ flinkName,
+ applicationsDynamicParams);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain newly-created application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(60)
+ void testReleaseFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain released application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(70)
+ void testStartFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain started application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ Awaitility.await()
+ .untilAsserted(() ->
assertThat(applicationsPage.applicationsList())
+ .as("Applications list should contain finished application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(80)
+ void testDeleteFlinkApplicationOnYarnPerJobMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await().untilAsserted(() -> {
+ browser.navigate().refresh();
+
+ assertThat(
+ applicationsPage.applicationsList()
+ ).noneMatch(
+ it -> it.getText().contains(applicationName)
+ );
+ });
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkHomeTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkHomeTest.java
new file mode 100644
index 000000000..6f9b09097
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkHomeTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.apacheflink.FlinkHomePage;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
+public class FlinkHomeTest {
+ private static RemoteWebDriver browser;
+
+ private static final String userName = "admin";
+
+ private static final String password = "streampark";
+
+ private static final String teamName = "default";
+
+ private static final String flinkName = "flink-1.18.1";
+
+ private static final String flinkHome = "/flink-1.18.1";
+
+ private static final String flinkDescription = "description test";
+
+ private static final String newFlinkHome = "flink_1.18.1";
+
+ @BeforeAll
+ public static void setup() {
+ new LoginPage(browser)
+ .login(userName, password, teamName)
+ .goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkHomePage.class);
+ }
+
+ @Test
+ @Order(10)
+ void testCreateFlinkHome() {
+ final FlinkHomePage flinkHomePage = new FlinkHomePage(browser);
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(flinkHomePage.flinkHomeList())
+ .as("Flink Home list should contain newly-created flink home")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(flinkName)));
+ }
+
+ @Test
+ @Order(20)
+ void testEditFlinkHome() {
+ final FlinkHomePage flinkHomePage = new FlinkHomePage(browser);
+ flinkHomePage.editFlinkHome(flinkName, newFlinkHome);
+
+ Awaitility.await().untilAsserted(() ->
assertThat(flinkHomePage.flinkHomeList())
+ .as("Flink Home list should contain edited flink home")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(newFlinkHome)));
+ }
+
+ @Test
+ @Order(30)
+ void testDeleteFlinkHome() {
+ final FlinkHomePage flinkHomePage = new FlinkHomePage(browser);
+ flinkHomePage.deleteFlinkHome(newFlinkHome);
+
+ Awaitility.await().untilAsserted(() -> {
+ browser.navigate().refresh();
+
+ assertThat(
+ flinkHomePage.flinkHomeList()
+ ).noneMatch(
+ it -> it.getText().contains(newFlinkHome)
+ );
+ });
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/ApacheFlinkPage.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/ApacheFlinkPage.java
new file mode 100644
index 000000000..0c96dda3e
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/ApacheFlinkPage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink;
+
+import lombok.Getter;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.ApplicationsPage;
+import org.apache.streampark.e2e.pages.common.NavBarPage;
+import org.apache.streampark.e2e.pages.common.NavBarPage.NavBarItem;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+
+@Getter
+public final class ApacheFlinkPage extends NavBarPage implements NavBarItem {
+ @FindBy(xpath = "//span[contains(@class,
'streampark-simple-menu-sub-title') and contains(text(), 'Applications')]//..")
+ private WebElement menuApplications;
+
+ @FindBy(xpath = "//span[contains(@class,
'streampark-simple-menu-sub-title') and contains(text(), 'Flink Home')]//..")
+ private WebElement menuFlinkHome;
+
+ @FindBy(xpath = "//span[contains(@class,
'streampark-simple-menu-sub-title') and contains(text(), 'Clusters')]//..")
+ private WebElement menuClusters;
+
+
+ public ApacheFlinkPage(RemoteWebDriver driver) {
+ super(driver);
+ }
+
+ public <T extends ApacheFlinkPage.Tab> T goToTab(Class<T> tab) {
+ if (tab == ApplicationsPage.class) {
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(menuApplications));
+ menuApplications.click();
+ return tab.cast(new ApplicationsPage(driver));
+ }
+
+ if (tab == FlinkHomePage.class) {
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(menuFlinkHome));
+ menuFlinkHome.click();
+ return tab.cast(new FlinkHomePage(driver));
+ }
+
+ throw new UnsupportedOperationException("Unknown tab: " +
tab.getName());
+ }
+
+ public interface Tab {
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/FlinkHomePage.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/FlinkHomePage.java
new file mode 100644
index 000000000..261a1424c
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/FlinkHomePage.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink;
+
+import lombok.Getter;
+import org.apache.streampark.e2e.pages.common.NavBarPage;
+import org.apache.streampark.e2e.pages.system.entity.UserManagementStatus;
+import org.apache.streampark.e2e.pages.system.entity.UserManagementUserType;
+import org.openqa.selenium.By;
+import org.openqa.selenium.Keys;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+import java.util.List;
+
+@Getter
+public class FlinkHomePage extends NavBarPage implements ApacheFlinkPage.Tab {
+ @FindBy(xpath = "//span[contains(., 'Flink
Home')]/..//button[contains(@class, 'ant-btn')]/span[contains(text(), 'Add
New')]")
+ private WebElement buttonCreateFlinkHome;
+
+ @FindBy(xpath = "//div[contains(@class, 'ant-spin-container')]")
+ private List<WebElement> flinkHomeList;
+
+ @FindBy(xpath = "//button[contains(@class, 'ant-btn')]/span[contains(.,
'Yes')]")
+ private WebElement deleteConfirmButton;
+
+ private final CreateFlinkHomeForm createFlinkHomeForm = new
CreateFlinkHomeForm();
+
+ public FlinkHomePage(RemoteWebDriver driver) {
+ super(driver);
+ }
+
+ public FlinkHomePage createFlinkHome(String flinkName, String flinkHome,
String description) {
+ waitForPageLoading();
+
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(buttonCreateFlinkHome));
+ buttonCreateFlinkHome.click();
+ createFlinkHomeForm.inputFlinkName().sendKeys(flinkName);
+ createFlinkHomeForm.inputFlinkHome().sendKeys(flinkHome);
+ createFlinkHomeForm.inputDescription().sendKeys(description);
+ createFlinkHomeForm.buttonSubmit().click();
+
+ waitForClickFinish();
+ return this;
+ }
+
+ public FlinkHomePage editFlinkHome(String oldFlinkName, String
newFlinkName) {
+ waitForPageLoading();
+
+ flinkHomeList()
+ .stream()
+ .filter(it -> it.getText().contains(oldFlinkName))
+ .flatMap(it ->
it.findElements(By.xpath("//button[contains(@class,'ant-btn')]/span[contains(@aria-label,'edit')]")).stream())
+ .filter(WebElement::isDisplayed)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No edit button in flink
home list"))
+ .click();
+
+ createFlinkHomeForm.inputFlinkName().sendKeys(Keys.CONTROL+"a");
+ createFlinkHomeForm.inputFlinkName().sendKeys(Keys.BACK_SPACE);
+ createFlinkHomeForm.inputFlinkName().sendKeys(newFlinkName);
+ createFlinkHomeForm.buttonSubmit().click();
+
+ waitForClickFinish();
+ return this;
+ }
+
+ public FlinkHomePage deleteFlinkHome(String flinkName) {
+ waitForPageLoading();
+
+ flinkHomeList()
+ .stream()
+ .filter(it -> it.getText().contains(flinkName))
+ .flatMap(it ->
it.findElements(By.xpath("//button[contains(@class,'ant-btn')]/span[contains(@aria-label,'delete')]")).stream())
+ .filter(WebElement::isDisplayed)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No delete button in
flink home list"))
+ .click();
+
+ deleteConfirmButton.click();
+
+ waitForClickFinish();
+ return this;
+ }
+
+ private void waitForPageLoading() {
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.urlContains("/flink/home"));
+ }
+
+ private void waitForClickFinish() {
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(buttonCreateFlinkHome));
+ }
+
+ @Getter
+ public class CreateFlinkHomeForm {
+ CreateFlinkHomeForm() {
+ PageFactory.initElements(driver, this);
+ }
+
+ @FindBy(id = "form_item_flinkName")
+ private WebElement inputFlinkName;
+
+ @FindBy(id = "form_item_flinkHome")
+ private WebElement inputFlinkHome;
+
+ @FindBy(id = "form_item_description")
+ private WebElement inputDescription;
+
+ @FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(text(), 'OK')]")
+ private WebElement buttonSubmit;
+
+ @FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(text(), 'Cancel')]")
+ private WebElement buttonCancel;
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationForm.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationForm.java
new file mode 100644
index 000000000..217e0c307
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationForm.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import
org.apache.streampark.e2e.pages.apacheflink.applications.entity.ApplicationsDynamicParams;
+import org.openqa.selenium.JavascriptExecutor;
+import org.openqa.selenium.WebDriver;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.FindBys;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+import java.util.List;
+
+
+@Getter
+public final class ApplicationForm {
+ private WebDriver driver;
+
+ @FindBy(xpath = "//div[contains(@codefield,
'jobType')]//div[contains(@class, 'ant-select-selector')]")
+ private WebElement buttonDevelopmentModeDropdown;
+
+ @FindBys({
+ @FindBy(css = "[codefield=jobType]"),
+ @FindBy(className = "ant-select-item-option-content")
+ })
+ private List<WebElement> selectDevelopmentMode;
+
+ @FindBy(xpath = "//div[contains(@codefield,
'executionMode')]//div[contains(@class, 'ant-select-selector')]")
+ private WebElement buttonExecutionModeDropdown;
+
+ @FindBys({
+ @FindBy(css = "[codefield=executionMode]"),
+ @FindBy(className = "ant-select-item-option-content")
+ })
+ private List<WebElement> selectExecutionMode;
+
+ @FindBy(id = "form_item_jobName")
+ private WebElement inputApplicationName;
+
+ @FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(text(), 'Submit')]")
+ private WebElement buttonSubmit;
+
+ @FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(text(), 'Cancel')]")
+ private WebElement buttonCancel;
+
+ public ApplicationForm(WebDriver driver) {
+ this.driver = driver;
+
+ PageFactory.initElements(driver, this);
+ }
+
+ @SneakyThrows
+ public ApplicationForm addApplication(DevelopmentMode developmentMode,
+ ExecutionMode executionMode,
+ String applicationName,
+ String flinkVersion,
+ ApplicationsDynamicParams
applicationsDynamicParams) {
+ Thread.sleep(1000);
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(buttonDevelopmentModeDropdown));
+ buttonDevelopmentModeDropdown.click();
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.visibilityOfAllElements(selectDevelopmentMode));
+ switch (developmentMode) {
+ case CUSTOM_CODE:
+ selectDevelopmentMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(DevelopmentMode.CUSTOM_CODE.desc()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Development mode not found: %s",
developmentMode.desc())))
+ .click();
+ break;
+ case FLINK_SQL:
+ selectDevelopmentMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(DevelopmentMode.FLINK_SQL.desc()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Development mode not found: %s",
developmentMode.desc())))
+ .click();
+ buttonExecutionModeDropdown.click();
+ switch (executionMode) {
+ case REMOTE:
+ selectExecutionMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(ExecutionMode.REMOTE.desc()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Execution mode not found: %s",
executionMode.desc()))
+ )
+ .click();
+ break;
+ case YARN_APPLICATION:
+ selectExecutionMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(ExecutionMode.YARN_APPLICATION.desc()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Execution mode not found: %s",
executionMode.desc()))
+ )
+ .click();
+ new FlinkSQLYarnApplicationForm(driver)
+ .add(flinkVersion,
applicationsDynamicParams.flinkSQL());
+ break;
+ case YARN_SESSION:
+ selectExecutionMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(ExecutionMode.YARN_SESSION.desc()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Execution mode not found: %s",
executionMode.desc()))
+ )
+ .click();
+ break;
+ case KUBERNETES_SESSION:
+ selectExecutionMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(ExecutionMode.KUBERNETES_SESSION.desc()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Execution mode not found: %s",
executionMode.desc()))
+ )
+ .click();
+ break;
+ case KUBERNETES_APPLICATION:
+ selectExecutionMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(ExecutionMode.KUBERNETES_APPLICATION.desc()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Execution mode not found: %s",
executionMode.desc()))
+ )
+ .click();
+ break;
+ case YARN_PER_JOB:
+ selectExecutionMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(ExecutionMode.YARN_PER_JOB.desc()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Execution mode not found: %s",
executionMode.desc()))
+ )
+ .click();
+ new FlinkSQLYarnApplicationForm(driver)
+ .add(flinkVersion,
applicationsDynamicParams.flinkSQL());
+ break;
+ default:
+ throw new
IllegalArgumentException(String.format("Unknown execution mode: %s",
executionMode.desc()));
+ }
+ break;
+ case PYTHON_FLINK:
+ selectDevelopmentMode.stream()
+ .filter(e ->
e.getText().equalsIgnoreCase(DevelopmentMode.PYTHON_FLINK.toString()))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("Development mode not found: %s",
developmentMode)))
+ .click();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown
development mode: %s", developmentMode));
+ }
+ inputApplicationName.sendKeys(applicationName);
+
+ buttonSubmit.click();
+
+ return this;
+ }
+
+ @Getter
+ public enum DevelopmentMode {
+ CUSTOM_CODE("custom code"),
+ FLINK_SQL("flink sql"),
+ PYTHON_FLINK("python flink"),
+ ;
+
+ private final String desc;
+
+ DevelopmentMode(String desc) {
+ this.desc = desc;
+ }
+ }
+
+ @Getter
+ public enum ExecutionMode {
+ REMOTE("remote"),
+ YARN_APPLICATION("yarn application"),
+ YARN_SESSION("yarn session"),
+ KUBERNETES_SESSION("kubernetes session"),
+ KUBERNETES_APPLICATION("kubernetes application"),
+ YARN_PER_JOB("yarn per-job (deprecated, please use yarn-application
mode)"),
+ ;
+
+ private final String desc;
+
+ ExecutionMode(String desc) {
+ this.desc = desc;
+ }
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationsPage.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationsPage.java
new file mode 100644
index 000000000..17a24be36
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/ApplicationsPage.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications;
+
+import lombok.Getter;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.common.NavBarPage;
+import org.apache.streampark.e2e.pages.system.entity.UserManagementStatus;
+import org.apache.streampark.e2e.pages.system.entity.UserManagementUserType;
+import org.openqa.selenium.By;
+import org.openqa.selenium.Keys;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.interactions.Actions;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.FindBys;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+import java.util.List;
+
+@Getter
+public class ApplicationsPage extends NavBarPage implements
ApacheFlinkPage.Tab {
+ @FindBy(xpath = "//div[contains(@class,
'app_list')]//button[contains(@class, 'ant-btn-primary')]/span[contains(text(),
'Add New')]")
+ private WebElement buttonCreateApplication;
+
+ @FindBy(xpath = "//tbody[contains(@class, 'ant-table-tbody')]")
+ private List<WebElement> applicationsList;
+
+ @FindBy(className = "ant-form-item-explain-error")
+ private List<WebElement> errorMessageList;
+
+ @FindBy(xpath = "//div[contains(@class,
'ant-dropdown-content')]//span[contains(text(), 'Delete')]")
+ private WebElement deleteButton;
+
+ @FindBy(xpath = "//button[contains(@class, 'ant-btn')]/span[contains(.,
'OK')]")
+ private WebElement deleteConfirmButton;
+
+ private final StartJobForm startJobForm = new StartJobForm();
+
+ private final CancelJobForm cancelJobForm = new CancelJobForm();
+
+ public ApplicationsPage(RemoteWebDriver driver) {
+ super(driver);
+ }
+
+ public ApplicationForm createApplication() {
+ waitForPageLoading();
+
+ buttonCreateApplication.click();
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.urlContains("/flink/app/add"));
+ return new ApplicationForm(driver);
+ }
+
+ public ApplicationsPage deleteApplication(String applicationName) {
+ waitForPageLoading();
+
+ WebElement extraButton = applicationsList()
+ .stream()
+ .filter(it -> it.getText().contains(applicationName))
+ .flatMap(it ->
it.findElements(By.xpath("//span[contains(@aria-label, 'more')]/..")).stream())
+ .filter(WebElement::isDisplayed)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No extra button in
applications list"))
+ ;
+ Actions actions = new Actions(this.driver);
+ actions.moveToElement(extraButton).perform();
+ deleteButton.click();
+ deleteConfirmButton.click();
+
+ return this;
+ }
+
+ public ApplicationsPage startApplication(String applicationName) {
+ waitForPageLoading();
+
+ applicationsList()
+ .stream()
+ .filter(it -> it.getText().contains(applicationName))
+ .flatMap(it -> it.findElements(By.xpath("//button[contains(@auth,
'app:start')]")).stream())
+ .filter(WebElement::isDisplayed)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No start button in
applications list"))
+ .click();
+
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(startJobForm().radioFromSavepoint()));
+ startJobForm.radioFromSavepoint().click();
+ startJobForm.buttonSubmit().click();
+
+ return this;
+ }
+
+ public ApplicationsPage releaseApplication(String applicationName) {
+ waitForPageLoading();
+
+ applicationsList()
+ .stream()
+ .filter(it -> it.getText().contains(applicationName))
+ .flatMap(it -> it.findElements(By.xpath("//button[contains(@auth,
'app:release')]")).stream())
+ .filter(WebElement::isDisplayed)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No release button in
applications list"))
+ .click();
+
+ return this;
+ }
+
+ public ApplicationsPage cancelApplication(String applicationName) {
+ waitForPageLoading();
+
+ applicationsList()
+ .stream()
+ .filter(it -> it.getText().contains(applicationName))
+ .flatMap(it -> it.findElements(By.xpath("//button[contains(@auth,
'app:cancel')]")).stream())
+ .filter(WebElement::isDisplayed)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No cancel button in
applications list"))
+ .click();
+
+ cancelJobForm.radioFromSavepoint().click();
+ cancelJobForm.buttonSubmit().click();
+
+ return this;
+ }
+
+ private void waitForPageLoading() {
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.urlContains("/flink/app"));
+ }
+
+ @Getter
+ public class StartJobForm {
+ StartJobForm() {
+ PageFactory.initElements(driver, this);
+ }
+
+ @FindBy(xpath =
"//button[@id='startApplicationModal_startSavePointed']//span[contains(text(),
'ON')]")
+ private WebElement radioFromSavepoint;
+
+ @FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(., 'Apply')]")
+ private WebElement buttonSubmit;
+
+ @FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(., 'Cancel')]")
+ private WebElement buttonCancel;
+ }
+
+ @Getter
+ public class CancelJobForm {
+ CancelJobForm() {
+ PageFactory.initElements(driver, this);
+ }
+
+ @FindBy(xpath = "//span[contains(text(), 'ON')]")
+ private WebElement radioFromSavepoint;
+
+ @FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(., 'Apply')]")
+ private WebElement buttonSubmit;
+
+ @FindBy(xpath = "//button[contains(@class,
'ant-btn')]//span[contains(., 'Cancel')]")
+ private WebElement buttonCancel;
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLEditor.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLEditor.java
new file mode 100644
index 000000000..e459f9cbc
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLEditor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications;
+
+import lombok.Getter;
+import org.openqa.selenium.WebDriver;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.interactions.Actions;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+
+@Getter
+public final class FlinkSQLEditor {
+ @FindBy(xpath = "//label[contains(@for,
'form_item_flinkSql')]/../..//div[contains(@class,
'monaco-editor')]//div[contains(@class, 'view-line')]")
+ private WebElement flinkSqlEditor;
+
+ private WebDriver driver;
+
+ public FlinkSQLEditor(WebDriver driver) {
+ PageFactory.initElements(driver, this);
+ this.driver = driver;
+ }
+
+ public FlinkSQLEditor content(String content) {
+ new WebDriverWait(this.driver,
Duration.ofSeconds(20)).until(ExpectedConditions.elementToBeClickable(flinkSqlEditor));
+
+ flinkSqlEditor.click();
+
+ Actions actions = new Actions(this.driver);
+ actions.moveToElement(flinkSqlEditor).sendKeys(content).perform();
+
+ return this;
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLYarnApplicationForm.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLYarnApplicationForm.java
new file mode 100644
index 000000000..6645835e1
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/FlinkSQLYarnApplicationForm.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.openqa.selenium.WebDriver;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.support.FindBy;
+import org.openqa.selenium.support.FindBys;
+import org.openqa.selenium.support.PageFactory;
+import org.openqa.selenium.support.ui.ExpectedConditions;
+import org.openqa.selenium.support.ui.WebDriverWait;
+
+import java.time.Duration;
+import java.util.List;
+
+@Getter
+public final class FlinkSQLYarnApplicationForm {
+ private WebDriver driver;
+
+ @FindBy(xpath = "//div[contains(@codefield,
'versionId')]//div[contains(@class, 'ant-select-selector')]")
+ private WebElement buttonFlinkVersionDropdown;
+
+ @FindBys({
+ @FindBy(css = "[codefield=versionId]"),
+ @FindBy(className = "ant-select-item-option-content")
+ })
+ private List<WebElement> selectFlinkVersion;
+
+ public FlinkSQLYarnApplicationForm(WebDriver driver) {
+ this.driver = driver;
+
+ PageFactory.initElements(driver, this);
+ }
+
+ @SneakyThrows
+ public FlinkSQLYarnApplicationForm add(String flinkVersion, String
flinkSql) {
+ buttonFlinkVersionDropdown.click();
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.visibilityOfAllElements(selectFlinkVersion));
+ selectFlinkVersion.stream()
+ .filter(e -> e.getText().equalsIgnoreCase(flinkVersion))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("Flink version not
found"))
+ .click();
+
+ new FlinkSQLEditor(driver).content(flinkSql);
+
+ return this;
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/entity/ApplicationsDynamicParams.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/entity/ApplicationsDynamicParams.java
new file mode 100644
index 000000000..61eb60e15
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/apacheflink/applications/entity/ApplicationsDynamicParams.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.streampark.e2e.pages.apacheflink.applications.entity;
+
+import lombok.Data;
+
+@Data
+public class ApplicationsDynamicParams {
+ private String flinkSQL;
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/NavBarPage.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/NavBarPage.java
index 3d44142d8..36f0a178a 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/NavBarPage.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/NavBarPage.java
@@ -20,8 +20,10 @@
package org.apache.streampark.e2e.pages.common;
+import org.apache.streampark.e2e.pages.apacheflink.ApacheFlinkPage;
import org.apache.streampark.e2e.pages.system.SystemPage;
+import org.openqa.selenium.By;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.openqa.selenium.support.FindBy;
@@ -55,9 +57,21 @@ public class NavBarPage {
}
public <T extends NavBarItem> T goToNav(Class<T> nav) {
+ if (nav == ApacheFlinkPage.class) {
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(apacheFlinkTab));
+ String tabOpenStateXpath = "//span[contains(@class, 'ml-2') and
contains(@class, 'streampark-simple-menu-sub-title') and contains(text(),
'Apache Flink')]/../../li[contains(@class, 'streampark-menu-opened')]";
+ if (driver.findElements(By.xpath(tabOpenStateXpath)).isEmpty()) {
+ apacheFlinkTab.click();
+ }
+ return nav.cast(new ApacheFlinkPage(driver));
+ }
+
if (nav == SystemPage.class) {
- new WebDriverWait(driver,
Duration.ofSeconds(60)).until(ExpectedConditions.elementToBeClickable(systemTab));
- systemTab.click();
+ new WebDriverWait(driver,
Duration.ofSeconds(10)).until(ExpectedConditions.elementToBeClickable(systemTab));
+ String tabOpenStateXpath = "//span[contains(@class, 'ml-2') and
contains(@class, 'streampark-simple-menu-sub-title') and contains(text(),
'System')]/../../li[contains(@class, 'streampark-menu-opened')]";
+ if (driver.findElements(By.xpath(tabOpenStateXpath)).isEmpty()) {
+ systemTab.click();
+ }
return nav.cast(new SystemPage(driver));
}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/Dockerfile
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/Dockerfile
new file mode 100644
index 000000000..97c5b8475
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/Dockerfile
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+ && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g"
/streampark/conf/config.yaml
+
+ENV FLINK_VERSION 1.16.3
+
+# Install Flink
+RUN sudo wget --no-check-certificate -O
/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
https://dlcdn.apache.org/flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
\
+ && cd / \
+ && sudo tar -zxf flink-${FLINK_VERSION}-bin-scala_2.12.tgz \
+ && sudo chown -R hadoop.hadoop /flink-${FLINK_VERSION}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
new file mode 100644
index 000000000..b6dfba10f
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.yaml
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..6ea5912e7
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.yaml
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+
+services:
+ namenode:
+ image: apache/streampark-flink-1.16.3-on-yarn:ci
+ hostname: namenode
+ command: [ "hdfs", "namenode" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "19870:9870"
+ env_file:
+ - docker-compose.config
+ environment:
+ ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://namenode:9870" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ datanode:
+ image: apache/streampark-flink-1.16.3-on-yarn:ci
+ hostname: datanode
+ command: [ "hdfs", "datanode" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://datanode:9864" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ depends_on:
+ namenode:
+ condition: service_healthy
+ resourcemanager:
+ image: apache/streampark-flink-1.16.3-on-yarn:ci
+ hostname: resourcemanager
+ command: [ "yarn", "resourcemanager" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "18088:8088"
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://resourcemanager:8088" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ nodemanager:
+ image: apache/streampark-flink-1.16.3-on-yarn:ci
+ hostname: nodemanager
+ command: [ "yarn", "nodemanager" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ depends_on:
+ resourcemanager:
+ condition: service_healthy
+ healthcheck:
+ test: [ "CMD", "curl", "http://nodemanager:8042" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ streampark:
+ image: apache/streampark-flink-1.16.3-on-yarn:ci
+ hostname: streampark
+ command:
+ - sh
+ - -c
+ - /streampark/bin/streampark.sh start_docker
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "20000:10000"
+ environment:
+ - SPRING_PROFILES_ACTIVE=h2
+ - TZ=Asia/Shanghai
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://streampark:10000" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+
+networks:
+ e2e:
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/Dockerfile
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/Dockerfile
new file mode 100644
index 000000000..f22348d0e
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/Dockerfile
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+ && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g"
/streampark/conf/config.yaml
+
+ENV FLINK_VERSION 1.17.2
+
+# Install Flink
+RUN sudo wget --no-check-certificate -O
/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
https://dlcdn.apache.org/flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
\
+ && cd / \
+ && sudo tar -zxf flink-${FLINK_VERSION}-bin-scala_2.12.tgz \
+ && sudo chown -R hadoop.hadoop /flink-${FLINK_VERSION}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
new file mode 100644
index 000000000..b6dfba10f
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.yaml
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..74d0b85e8
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.yaml
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+
+services:
+ namenode:
+ image: apache/streampark-flink-1.17.2-on-yarn:ci
+ hostname: namenode
+ command: [ "hdfs", "namenode" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "19870:9870"
+ env_file:
+ - docker-compose.config
+ environment:
+ ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://namenode:9870" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ datanode:
+ image: apache/streampark-flink-1.17.2-on-yarn:ci
+ hostname: datanode
+ command: [ "hdfs", "datanode" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://datanode:9864" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ depends_on:
+ namenode:
+ condition: service_healthy
+ resourcemanager:
+ image: apache/streampark-flink-1.17.2-on-yarn:ci
+ hostname: resourcemanager
+ command: [ "yarn", "resourcemanager" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "18088:8088"
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://resourcemanager:8088" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ nodemanager:
+ image: apache/streampark-flink-1.17.2-on-yarn:ci
+ hostname: nodemanager
+ command: [ "yarn", "nodemanager" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ depends_on:
+ resourcemanager:
+ condition: service_healthy
+ healthcheck:
+ test: [ "CMD", "curl", "http://nodemanager:8042" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ streampark:
+ image: apache/streampark-flink-1.17.2-on-yarn:ci
+ hostname: streampark
+ command:
+ - sh
+ - -c
+ - /streampark/bin/streampark.sh start_docker
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "20000:10000"
+ environment:
+ - SPRING_PROFILES_ACTIVE=h2
+ - TZ=Asia/Shanghai
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://streampark:10000" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+
+networks:
+ e2e:
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/Dockerfile
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/Dockerfile
new file mode 100644
index 000000000..12e5cdaad
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/Dockerfile
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+ && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g"
/streampark/conf/config.yaml
+
+ENV FLINK_VERSION 1.18.1
+
+# Install Flink
+RUN sudo wget --no-check-certificate -O
/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
https://dlcdn.apache.org/flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_2.12.tgz
\
+ && cd / \
+ && sudo tar -zxf flink-${FLINK_VERSION}-bin-scala_2.12.tgz \
+ && sudo chown -R hadoop.hadoop /flink-${FLINK_VERSION}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
new file mode 100644
index 000000000..b6dfba10f
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.yaml
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..ad92f84c0
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.yaml
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3"
+
+services:
+ namenode:
+ image: apache/streampark-flink-1.18.1-on-yarn:ci
+ hostname: namenode
+ command: [ "hdfs", "namenode" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "19870:9870"
+ env_file:
+ - docker-compose.config
+ environment:
+ ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://namenode:9870" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ datanode:
+ image: apache/streampark-flink-1.18.1-on-yarn:ci
+ hostname: datanode
+ command: [ "hdfs", "datanode" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://datanode:9864" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ depends_on:
+ namenode:
+ condition: service_healthy
+ resourcemanager:
+ image: apache/streampark-flink-1.18.1-on-yarn:ci
+ hostname: resourcemanager
+ command: [ "yarn", "resourcemanager" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "18088:8088"
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://resourcemanager:8088" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ nodemanager:
+ image: apache/streampark-flink-1.18.1-on-yarn:ci
+ hostname: nodemanager
+ command: [ "yarn", "nodemanager" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ depends_on:
+ resourcemanager:
+ condition: service_healthy
+ healthcheck:
+ test: [ "CMD", "curl", "http://nodemanager:8042" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ streampark:
+ image: apache/streampark-flink-1.18.1-on-yarn:ci
+ hostname: streampark
+ command:
+ - sh
+ - -c
+ - /streampark/bin/streampark.sh start_docker
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "20000:10000"
+ environment:
+ - SPRING_PROFILES_ACTIVE=h2
+ - TZ=Asia/Shanghai
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://streampark:10000" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+
+networks:
+ e2e:
diff --git
a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/StreamParkExtension.java
b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/StreamParkExtension.java
index 9592381a4..b33a59e80 100644
---
a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/StreamParkExtension.java
+++
b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/StreamParkExtension.java
@@ -77,7 +77,7 @@ final class StreamParkExtension implements BeforeAllCallback,
AfterAllCallback,
@Override
@SuppressWarnings("UnstableApiUsage")
public void beforeAll(ExtensionContext context) throws IOException {
- Awaitility.setDefaultTimeout(Duration.ofSeconds(60));
+ Awaitility.setDefaultTimeout(Duration.ofSeconds(120));
Awaitility.setDefaultPollInterval(Duration.ofSeconds(2));
setRecordPath();