This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch 1.20 in repository https://gitbox.apache.org/repos/asf/streampark.git
commit dc9966f978f2d95d0676c71957cea03dc69e7e69 Author: benjobs <[email protected]> AuthorDate: Mon Jun 23 13:40:45 2025 +0800 [e2e] e2e flink-version update to 1.20.1 --- .../impl/FlinkApplicationInfoServiceImpl.java | 8 +- .../core/service/FlinkSavepointServiceTest.java | 11 +- .../cases/Flink120OnRemoteClusterDeployTest.java | 104 ++++++ .../e2e/cases/Flink120OnYarnClusterDeployTest.java | 152 +++++++++ .../e2e/cases/FlinkSQL120OnYarnTest.java | 371 +++++++++++++++++++++ .../docker/flink-1.20-on-remote/Dockerfile | 18 + .../flink-1.20-on-remote/docker-compose.yaml | 90 +++++ .../resources/docker/flink-1.20-on-yarn/Dockerfile | 45 +++ .../flink-1.20-on-yarn/docker-compose.config | 44 +++ .../docker/flink-1.20-on-yarn/docker-compose.yaml | 163 +++++++++ 10 files changed, 998 insertions(+), 8 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java index 7276bcde1..8ced52f6a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java @@ -484,21 +484,21 @@ public class FlinkApplicationInfoServiceImpl extends ServiceImpl<FlinkApplicatio final String pathPart = uri.getPath(); String error = null; if (scheme == null) { - error = "This state.savepoints.dir value " + error = "This state-savepoints-dir value " + savepointPath + " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI."; } else if (pathPart == null) { - error = "This state.savepoints.dir value " + error = "This state-savepoints-dir value " + savepointPath + " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data."; } else if (pathPart.isEmpty() || "/".equals(pathPart)) { - error = "This state.savepoints.dir value " + error = "This state-savepoints-dir value " + savepointPath + " Cannot use the root directory for checkpoints."; } return error; } else { - return "When custom savepoint is not set, state.savepoints.dir needs to be set in properties or flink-conf.yaml of application"; + return "When custom savepoint is not set, state-savepoints-dir needs to be set in properties or flink config of application"; } } } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java index 14cbc3fdb..2e27d983e 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java @@ -82,8 +82,8 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase { */ @Test void testGetSavepointFromDynamicProps() { - String propsWithEmptyTargetValue = "-Dstate.savepoints.dir="; - String props = "-Dstate.savepoints.dir=hdfs:///test"; + String propsWithEmptyTargetValue = "-Dexecution.checkpointing.dir="; + String props = "-Dexecution.checkpointing.dir=hdfs:///test"; FlinkSavepointServiceImpl savepointServiceImpl = (FlinkSavepointServiceImpl) savepointService; assertThat(savepointServiceImpl.getSavepointFromDynamicProps(null)).isNull(); @@ -114,11 +114,13 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase { app.setJobType(FlinkJobType.FLINK_JAR.getMode()); assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); + String ckDir = "execution.checkpointing.dir=hdfs:///test"; + // Test for (StreamPark job Or FlinkSQL job) with application config just disabled checkpoint. FlinkApplicationConfig appCfg = new FlinkApplicationConfig(); appCfg.setId(appCfgId); appCfg.setAppId(appId); - appCfg.setContent("state.savepoints.dir=hdfs:///test"); + appCfg.setContent(ckDir); appCfg.setFormat(ConfigFileTypeEnum.PROPERTIES.getValue()); configService.save(appCfg); assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); @@ -134,9 +136,10 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase { // Test for configured CHECKPOINTING_INTERVAL appCfg.setContent( DeflaterUtils.zipString( - "state.savepoints.dir=hdfs:///test\n" + ckDir + "\n" + String.format("%s=%s", CHECKPOINTING_INTERVAL.key(), "3min"))); + configService.updateById(appCfg); FlinkEffective effective = new FlinkEffective(); effective.setTargetId(appCfg.getId()); diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java new file mode 100644 index 000000000..2e5236768 --- /dev/null +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.e2e.cases; + +import org.apache.streampark.e2e.core.StreamPark; +import org.apache.streampark.e2e.pages.LoginPage; +import org.apache.streampark.e2e.pages.common.Constants; +import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage; +import org.apache.streampark.e2e.pages.flink.FlinkHomePage; +import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm; +import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage; +import org.apache.streampark.e2e.pages.flink.clusters.RemoteForm; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.openqa.selenium.WebElement; +import org.openqa.selenium.remote.RemoteWebDriver; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import static org.assertj.core.api.Assertions.assertThat; + +@StreamPark(composeFiles = "docker/flink-1.20-on-remote/docker-compose.yaml") +public class Flink118OnRemoteClusterDeployTest { + + public static RemoteWebDriver browser; + + private static final String flinkName = "flink-1.20.1"; + + private static final String flinkHome = "/opt/flink/"; + + private static final String flinkDescription = "description test"; + + private static final String flinkClusterName = "flink_1.20.1_cluster_e2e"; + + private static final String flinkJobManagerUrl = "http://jobmanager:8081"; + + private static final ClusterDetailForm.DeployMode deployMode = ClusterDetailForm.DeployMode.STANDALONE; + + @BeforeAll + public static void setUp() { + FlinkHomePage flinkHomePage = new LoginPage(browser) + .login() + .goToNav(ApacheFlinkPage.class) + .goToTab(FlinkHomePage.class); + + flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription); + + flinkHomePage.goToNav(ApacheFlinkPage.class) + .goToTab(FlinkClustersPage.class); + } + + @Test + @Order(1) + public void testCreateFlinkCluster() { + FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser); + + flinkClustersPage.createFlinkCluster() + .<RemoteForm>addCluster(deployMode) + .jobManagerURL(flinkJobManagerUrl) + .clusterName(flinkClusterName) + .flinkVersion(flinkName) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(flinkClustersPage.flinkClusterList) + .as("Flink clusters list should contain newly-created application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(flinkClusterName))); + } + + @Test + @Order(5) + public void testDeleteFlinkCluster() { + final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser); + + flinkClustersPage.deleteFlinkCluster(flinkClusterName); + + Awaitility.await() + .untilAsserted( + () -> { + browser.navigate().refresh(); + Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); + assertThat(flinkClustersPage.flinkClusterList) + .noneMatch(it -> it.getText().contains(flinkClusterName)); + }); + } +} diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java new file mode 100644 index 000000000..c1750746b --- /dev/null +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.e2e.cases; + +import org.apache.streampark.e2e.core.StreamPark; +import org.apache.streampark.e2e.pages.LoginPage; +import org.apache.streampark.e2e.pages.common.Constants; +import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage; +import org.apache.streampark.e2e.pages.flink.FlinkHomePage; +import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm; +import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage; +import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.openqa.selenium.WebElement; +import org.openqa.selenium.remote.RemoteWebDriver; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import static org.assertj.core.api.Assertions.assertThat; + +@StreamPark(composeFiles = "docker/flink-1.20-on-yarn/docker-compose.yaml") +public class Flink118OnYarnClusterDeployTest { + + public static RemoteWebDriver browser; + + private static final String flinkName = "flink-1.20.1"; + + private static final String flinkHome = "/flink-1.20.1"; + + private static final String flinkDescription = "description test"; + + private static final String flinkClusterName = "flink_1.20.1_cluster_e2e"; + + private static final String flinkClusterNameEdited = "flink_1.20.1_cluster_e2e_edited"; + + private static final ClusterDetailForm.DeployMode deployMode = ClusterDetailForm.DeployMode.YARN_SESSION; + + @BeforeAll + public static void setup() { + FlinkHomePage flinkHomePage = new LoginPage(browser) + .login() + .goToNav(ApacheFlinkPage.class) + .goToTab(FlinkHomePage.class); + + flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription); + + flinkHomePage.goToNav(ApacheFlinkPage.class) + .goToTab(FlinkClustersPage.class); + } + + @Test + @Order(1) + public void testCreateFlinkCluster() { + final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser); + + flinkClustersPage.createFlinkCluster() + .<YarnSessionForm>addCluster(deployMode) + .resolveOrder(YarnSessionForm.ResolveOrder.CHILD_FIRST) + .clusterName(flinkClusterName) + .flinkVersion(flinkName) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(flinkClustersPage.flinkClusterList) + .as("Flink clusters list should contain newly-created application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(flinkClusterName))); + } + + @Test + @Order(2) + public void testEditFlinkCluster() { + final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser); + + flinkClustersPage.editFlinkCluster(flinkClusterName) + .<YarnSessionForm>addCluster(deployMode) + .clusterName(flinkClusterNameEdited) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(flinkClustersPage.flinkClusterList) + .as("Flink clusters list should contain edited application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(flinkClusterNameEdited))); + } + + @Test + @Order(3) + public void testStartFlinkCluster() { + final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser); + + flinkClustersPage.startFlinkCluster(flinkClusterNameEdited); + + Awaitility.await() + .untilAsserted( + () -> assertThat(flinkClustersPage.flinkClusterList) + .as("Flink clusters list should contain running application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + } + + @Test + @Order(4) + public void testStopFlinkCluster() { + final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser); + + flinkClustersPage.stopFlinkCluster(flinkClusterNameEdited); + + Awaitility.await() + .untilAsserted( + () -> assertThat(flinkClustersPage.flinkClusterList) + .as("Flink clusters list should contain canceled application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("CANCELED"))); + } + + @Test + @Order(5) + public void testDeleteFlinkCluster() { + final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser); + + flinkClustersPage.deleteFlinkCluster(flinkClusterNameEdited); + + Awaitility.await() + .untilAsserted( + () -> { + browser.navigate().refresh(); + Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); + assertThat(flinkClustersPage.flinkClusterList) + .noneMatch(it -> it.getText().contains(flinkClusterNameEdited)); + }); + } +} diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java new file mode 100644 index 000000000..e8b28fb99 --- /dev/null +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.e2e.cases; + +import org.apache.streampark.e2e.core.StreamPark; +import org.apache.streampark.e2e.pages.LoginPage; +import org.apache.streampark.e2e.pages.common.Constants; +import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage; +import org.apache.streampark.e2e.pages.flink.FlinkHomePage; +import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm; +import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage; +import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm; +import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage; +import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm; + +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.openqa.selenium.WebElement; +import org.openqa.selenium.remote.RemoteWebDriver; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import static org.assertj.core.api.Assertions.assertThat; + +@StreamPark(composeFiles = "docker/flink-1.20-on-yarn/docker-compose.yaml") +public class FlinkSQL118OnYarnTest { + + public static RemoteWebDriver browser; + + private static final String flinkName = "flink-1.20.1"; + + private static final String flinkHome = "/flink-1.20.1"; + + private static final String applicationName = "flink-120-e2e-test"; + + private static final String flinkDescription = "description test"; + + private static final String flinkClusterName = "flink_1.20.1_cluster_e2e"; + + @BeforeAll + public static void setup() { + FlinkHomePage flinkHomePage = new LoginPage(browser) + .login() + .goToNav(ApacheFlinkPage.class) + .goToTab(FlinkHomePage.class); + + flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription); + + FlinkClustersPage flinkClustersPage = flinkHomePage.goToNav(ApacheFlinkPage.class) + .goToTab(FlinkClustersPage.class); + + flinkClustersPage.createFlinkCluster() + .<YarnSessionForm>addCluster(ClusterDetailForm.DeployMode.YARN_SESSION) + .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST) + .clusterName(flinkClusterName) + .flinkVersion(flinkName) + .submit(); + + flinkClustersPage.startFlinkCluster(flinkClusterName); + + flinkClustersPage.goToNav(ApacheFlinkPage.class) + .goToTab(ApplicationsPage.class); + } + + @Test + @Order(1) + void testCreateFlinkApplicationOnYarnApplicationMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage + .createApplication() + .addApplication( + ApplicationForm.FlinkJobType.FLINK_SQL, + ApplicationForm.DeployMode.YARN_APPLICATION, + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain newly-created application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(applicationName))); + } + + @Test + @Order(2) + void testReleaseFlinkApplicationOnYarnApplicationMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.releaseApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain released application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("SUCCESS"))); + } + + @Test + @Order(3) + void testStartFlinkApplicationOnYarnApplicationMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain finished application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("FINISHED"))); + } + + @Test + @Order(4) + @SneakyThrows + void testCancelFlinkApplicationOnYarnApplicationMode() { + Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain restarted application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + applicationsPage.cancelApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain canceled application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("CANCELED"))); + } + + @Test + @Order(5) + void testDeleteFlinkApplicationOnYarnApplicationMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.deleteApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> { + browser.navigate().refresh(); + + assertThat(applicationsPage.applicationsList) + .noneMatch(it -> it.getText().contains(applicationName)); + }); + } + + @Test + @Order(6) + void testCreateFlinkApplicationOnYarnPerJobMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage + .createApplication() + .addApplication( + ApplicationForm.FlinkJobType.FLINK_SQL, + ApplicationForm.DeployMode.YARN_PER_JOB, + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain newly-created application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(applicationName))); + } + + @Test + @Order(7) + void testReleaseFlinkApplicationOnYarnPerJobMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.releaseApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain released application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("SUCCESS"))); + } + + @Test + @Order(8) + void testStartFlinkApplicationOnYarnPerJobMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain started application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain finished application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("FINISHED"))); + } + + @Test + @Order(9) + @SneakyThrows + void testRestartAndCancelFlinkApplicationOnYarnPerJobMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain restarted application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + applicationsPage.cancelApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain canceled application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("CANCELED"))); + } + + @Test + @Order(10) + void testDeleteFlinkApplicationOnYarnPerJobMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.deleteApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> { + browser.navigate().refresh(); + + assertThat(applicationsPage.applicationsList) + .noneMatch(it -> it.getText().contains(applicationName)); + }); + } + + @Test + @Order(11) + void testCreateFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage + .createApplication() + .addApplication( + ApplicationForm.FlinkJobType.FLINK_SQL, + ApplicationForm.DeployMode.YARN_SESSION, + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .flinkCluster(flinkClusterName) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain newly-created application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(applicationName))); + } + + @Test + @Order(12) + void testReleaseFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.releaseApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain released application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("SUCCESS"))); + } + + @Test + @Order(13) + void testStartFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain finished application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("FINISHED"))); + } + + @Test + @Order(14) + @SneakyThrows + void testRestartAndCancelFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain restarted application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + applicationsPage.cancelApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList) + .as("Applications list should contain canceled application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("CANCELED"))); + } + + @Test + @Order(15) + void testDeleteFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + applicationsPage.deleteApplication(applicationName); + Awaitility.await() + .untilAsserted( + () -> { + browser.navigate().refresh(); + + assertThat(applicationsPage.applicationsList) + .noneMatch(it -> it.getText().contains(applicationName)); + }); + } +} diff --git a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile new file mode 100644 index 000000000..b1e093fd4 --- /dev/null +++ b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM apache/streampark:ci diff --git a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml new file mode 100644 index 000000000..b16486e6b --- /dev/null +++ b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +services: + jobmanager: + image: flink:1.20.1 + command: jobmanager + ports: + - "8081:8081" + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + networks: + - e2e + volumes: + - flink_data:/opt/flink + - /var/run/docker.sock:/var/run/docker.sock + healthcheck: + test: [ "CMD", "curl", "http://localhost:8081" ] + interval: 5s + timeout: 5s + retries: 120 + + taskmanager: + image: flink:1.20.1 + depends_on: + - jobmanager + command: taskmanager + scale: 1 + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: 2 + networks: + - e2e + volumes: + - flink_data:/opt/flink + - /var/run/docker.sock:/var/run/docker.sock + healthcheck: + test: [ "CMD", "curl", "http://localhost:8081" ] + interval: 5s + timeout: 5s + retries: 120 + + streampark: + image: apache/streampark:ci + command: bash bin/streampark.sh start_docker + build: + context: ./ + dockerfile: ./Dockerfile + ports: + - 10000:10000 + - 10030:10030 + environment: + - SPRING_PROFILES_ACTIVE=h2 + - TZ=Asia/Shanghai + - FLINK_JOBMANAGER_URL=http://jobmanager:8081 + privileged: true + restart: unless-stopped + networks: + - e2e + volumes: + - flink_data:/opt/flink + - ${HOME}/streampark_build_logs:/tmp/streampark/logs/build_logs/ + - /var/run/docker.sock:/var/run/docker.sock + healthcheck: + test: [ "CMD", "curl", "http://localhost:10000" ] + interval: 5s + timeout: 5s + retries: 120 +networks: + e2e: +volumes: + flink_data: diff --git a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile new file mode 100644 index 000000000..6b050d220 --- /dev/null +++ b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM apache/streampark:ci as base-image +FROM flink:1.20.1-scala_2.12-java8 as flink-image + +# Install streampark +FROM sbloodys/hadoop:3.3.6 +COPY --from=base-image /streampark /streampark +RUN sudo chown -R hadoop.hadoop /streampark \ + && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g" /streampark/conf/config.yaml + +# Install Flink +COPY --from=flink-image /opt/flink /flink-1.20.1 +RUN sudo chown -R hadoop.hadoop /flink-1.20.1 + +# Install javac +ARG TARGETPLATFORM +RUN echo "TARGETPLATFORM: $TARGETPLATFORM" +RUN \ + if [ "$TARGETPLATFORM" = "linux/arm64" ];then \ + sudo rm -f /etc/yum.repos.d/*.repo; \ + sudo wget http://mirrors.aliyun.com/repo/Centos-altarch-7.repo -O /etc/yum.repos.d/CentOS-Base.repo; \ + sudo sed -i "s/http:\/\//https:\/\//g" /etc/yum.repos.d/CentOS-Base.repo; \ + sudo yum install -y java-1.8.0-openjdk-devel; \ + elif [ "$TARGETPLATFORM" = "linux/amd64" ];then \ + sudo yum install -y java-1.8.0-openjdk-devel; \ + else \ + echo "unknown TARGETPLATFORM: $TARGETPLATFORM"; \ + exit 2; \ + fi diff --git a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config new file mode 100644 index 000000000..441fa1118 --- /dev/null +++ b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +HADOOP_HOME=/opt/hadoop +CORE-SITE.XML_fs.default.name=hdfs://namenode +CORE-SITE.XML_fs.defaultFS=hdfs://namenode +HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020 +HDFS-SITE.XML_dfs.replication=1 +MAPRED-SITE.XML_mapreduce.framework.name=yarn +MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager +YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false +YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600 +YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false +YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle +YARN-SITE.XML_yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage=99.9 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=* +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=* +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings= +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false diff --git a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml new file mode 100644 index 000000000..73af8308b --- /dev/null +++ b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + namenode: + image: apache/streampark-flink-1.20.1-on-yarn:ci + hostname: namenode + command: [ "hdfs", "namenode" ] + networks: + - e2e + build: + context: ./ + ports: + - "19870:9870" + env_file: + - docker-compose.config + environment: + ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name" + logging: + driver: "json-file" + options: + max-size: "200m" + max-file: "1" + tty: true + stdin_open: true + restart: always + healthcheck: + test: [ "CMD", "curl", "http://namenode:9870" ] + interval: 5s + timeout: 5s + retries: 120 + datanode: + image: apache/streampark-flink-1.20.1-on-yarn:ci + hostname: datanode + command: [ "hdfs", "datanode" ] + networks: + - e2e + build: + context: ./ + env_file: + - docker-compose.config + logging: + driver: "json-file" + options: + max-size: "200m" + max-file: "1" + tty: true + stdin_open: true + restart: always + healthcheck: + test: [ "CMD", "curl", "http://datanode:9864" ] + interval: 5s + timeout: 5s + retries: 120 + depends_on: + namenode: + condition: service_healthy + resourcemanager: + image: apache/streampark-flink-1.20.1-on-yarn:ci + hostname: resourcemanager + command: [ "yarn", "resourcemanager" ] + networks: + - e2e + build: + context: ./ + ports: + - "18088:8088" + env_file: + - docker-compose.config + logging: + driver: "json-file" + options: + max-size: "200m" + max-file: "1" + tty: true + stdin_open: true + restart: always + healthcheck: + test: [ "CMD", "curl", "http://resourcemanager:8088" ] + interval: 5s + timeout: 5s + retries: 120 + nodemanager: + image: apache/streampark-flink-1.20.1-on-yarn:ci + hostname: nodemanager + command: [ "yarn", "nodemanager" ] + networks: + - e2e + build: + context: ./ + env_file: + - docker-compose.config + logging: + driver: "json-file" + options: + max-size: "200m" + max-file: "1" + tty: true + stdin_open: true + restart: always + depends_on: + resourcemanager: + condition: service_healthy + healthcheck: + test: [ "CMD", "curl", "http://nodemanager:8042" ] + interval: 5s + timeout: 5s + retries: 120 + streampark: + image: apache/streampark-flink-1.20.1-on-yarn:ci + hostname: streampark + command: + - sh + - -c + - /streampark/bin/streampark.sh start_docker + networks: + - e2e + build: + context: ./ + ports: + - "20000:10000" + environment: + - SPRING_PROFILES_ACTIVE=h2 + - TZ=Asia/Shanghai + env_file: + - docker-compose.config + logging: + driver: "json-file" + options: + max-size: "200m" + max-file: "1" + tty: true + stdin_open: true + restart: always + depends_on: + namenode: + condition: service_healthy + datanode: + condition: service_healthy + resourcemanager: + condition: service_healthy + nodemanager: + condition: service_healthy + healthcheck: + test: [ "CMD", "curl", "http://streampark:10000" ] + interval: 5s + timeout: 5s + retries: 120 + +networks: + e2e:
