This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d9a772f55 [Feature] Supported apache flink 1.20 (#4263)
d9a772f55 is described below
commit d9a772f5528776322f024ff4bfa032c6605ccf7e
Author: ouyangwulin <[email protected]>
AuthorDate: Mon Jun 30 13:38:53 2025 +0800
[Feature] Supported apache flink 1.20 (#4263)
* supported apache flink 1.20
* fixed module
---
.github/workflows/e2e.yml | 6 +-
pom.xml | 2 +-
.../streampark/common/conf/FlinkVersion.scala | 2 +-
.../streampark-console-service/pom.xml | 7 +
.../console/core/runner/EnvInitializer.java | 2 +-
.../impl/FlinkApplicationInfoServiceImpl.java | 8 +-
.../core/service/FlinkSavepointServiceTest.java | 11 +-
.../cases/Flink120OnRemoteClusterDeployTest.java | 104 ++++++++
.../e2e/cases/Flink120OnYarnClusterDeployTest.java | 152 ++++++++++++
.../e2e/cases/FlinkSQL120OnYarnTest.java | 269 +++++++++++++++++++++
.../docker/flink-1.20-on-remote/Dockerfile | 18 ++
.../flink-1.20-on-remote/docker-compose.yaml | 90 +++++++
.../resources/docker/flink-1.20-on-yarn/Dockerfile | 45 ++++
.../flink-1.20-on-yarn/docker-compose.config | 44 ++++
.../docker/flink-1.20-on-yarn/docker-compose.yaml | 163 +++++++++++++
.../flink/client/impl/YarnPerJobClient.scala | 3 +-
.../flink/kubernetes/KubernetesRetriever.scala | 2 +-
streampark-flink/streampark-flink-shims/pom.xml | 1 +
.../streampark-flink-shims-test/pom.xml | 2 +-
.../pom.xml | 119 +++++----
.../streampark/flink/core/FlinkClusterClient.scala | 64 +++++
.../flink/core/FlinkKubernetesClient.scala | 32 +++
.../streampark/flink/core/StreamTableContext.scala | 169 +++++++++++++
.../streampark/flink/core/TableContext.scala | 106 ++++++++
.../apache/streampark/flink/core/TableExt.scala | 43 ++++
tools/dependencies/known-dependencies.txt | 33 +--
26 files changed, 1418 insertions(+), 79 deletions(-)
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index 070e984b5..4582dafc0 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -136,13 +136,13 @@ jobs:
class: org.apache.streampark.e2e.cases.VariableManagementTest
- name: FlinkOnRemoteClusterDeployTest
- class:
org.apache.streampark.e2e.cases.Flink118OnRemoteClusterDeployTest
+ class:
org.apache.streampark.e2e.cases.Flink120OnRemoteClusterDeployTest
- name: FlinkOnYarnClusterDeployTest
- class:
org.apache.streampark.e2e.cases.Flink118OnYarnClusterDeployTest
+ class:
org.apache.streampark.e2e.cases.Flink120OnYarnClusterDeployTest
- name: FlinkSQLOnYarnTest
- class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest
+ class: org.apache.streampark.e2e.cases.FlinkSQL120OnYarnTest
#- name: Flink117OnRemoteClusterDeployTest
# class:
org.apache.streampark.e2e.cases.Flink117OnRemoteClusterDeployTest
diff --git a/pom.xml b/pom.xml
index c1de29d32..8f5103008 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,7 +93,7 @@
<scala.binary.flink.version>_${scala.binary.version}</scala.binary.flink.version>
<flink.connector.version>3.2.0-1.18</flink.connector.version>
<flink.elasticserch.connector.version>3.0.1-1.17</flink.elasticserch.connector.version>
- <flink.version>1.18.1</flink.version>
+ <flink.version>1.20.1</flink.version>
<flink.shaded.version>1.8.1</flink.shaded.version>
<streampark.shaded.version>1.0.0</streampark.shaded.version>
<streampark.flink.shims.version>1.14</streampark.flink.shims.version>
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index e4950e247..83a09a227 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -126,7 +126,7 @@ class FlinkVersion(val flinkHome: String) extends
Serializable with Logger {
def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
- case Array(1, v, _) if v >= 12 && v <= 19 => true
+ case Array(1, v, _) if v >= 12 && v <= 20 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported flink version:
$version")
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index ea09556ea..66d8a6463 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -586,6 +586,13 @@
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
+ <!-- flink 1.20 support-->
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-shims_flink-1.20_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+
<outputDirectory>${project.build.directory}/shims</outputDirectory>
+ </dependency>
<!-- flink-submit-core -->
<dependency>
<groupId>org.apache.streampark</groupId>
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 4e5ba38ac..c9addb417 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -71,7 +71,7 @@ public class EnvInitializer implements ApplicationRunner {
private final FileFilter fileFilter = p -> !".gitkeep".equals(p.getName());
private static final Pattern PATTERN_FLINK_SHIMS_JAR = Pattern.compile(
- "^streampark-flink-shims_flink-(1.1[2-9])_(2.12)-(.*).jar$",
+ "^streampark-flink-shims_flink-(1.1[2-9]|1\\.2[0-9])_(2.12)-(.*).jar$",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
@SneakyThrows
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
index 7276bcde1..8ced52f6a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java
@@ -484,21 +484,21 @@ public class FlinkApplicationInfoServiceImpl extends
ServiceImpl<FlinkApplicatio
final String pathPart = uri.getPath();
String error = null;
if (scheme == null) {
- error = "This state.savepoints.dir value "
+ error = "This state-savepoints-dir value "
+ savepointPath
+ " scheme (hdfs://, file://, etc) of is null. Please
specify the file system scheme explicitly in the URI.";
} else if (pathPart == null) {
- error = "This state.savepoints.dir value "
+ error = "This state-savepoints-dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null.
Please specify a directory path for the checkpoint data.";
} else if (pathPart.isEmpty() || "/".equals(pathPart)) {
- error = "This state.savepoints.dir value "
+ error = "This state-savepoints-dir value "
+ savepointPath
+ " Cannot use the root directory for checkpoints.";
}
return error;
} else {
- return "When custom savepoint is not set, state.savepoints.dir
needs to be set in properties or flink-conf.yaml of application";
+ return "When custom savepoint is not set, state-savepoints-dir
needs to be set in properties or flink config of application";
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
index 14cbc3fdb..54eece310 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
@@ -82,8 +82,8 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase {
*/
@Test
void testGetSavepointFromDynamicProps() {
- String propsWithEmptyTargetValue = "-Dstate.savepoints.dir=";
- String props = "-Dstate.savepoints.dir=hdfs:///test";
+ String propsWithEmptyTargetValue =
"-Dexecution.checkpointing.savepoint-dir=";
+ String props = "-Dexecution.checkpointing.savepoint-dir=hdfs:///test";
FlinkSavepointServiceImpl savepointServiceImpl =
(FlinkSavepointServiceImpl) savepointService;
assertThat(savepointServiceImpl.getSavepointFromDynamicProps(null)).isNull();
@@ -114,11 +114,13 @@ class FlinkSavepointServiceTest extends
SpringUnitTestBase {
app.setJobType(FlinkJobType.FLINK_JAR.getMode());
assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
+ String ckDir = SAVEPOINT_DIRECTORY.key() + "=hdfs:///test";
+
// Test for (StreamPark job Or FlinkSQL job) with application config
just disabled checkpoint.
FlinkApplicationConfig appCfg = new FlinkApplicationConfig();
appCfg.setId(appCfgId);
appCfg.setAppId(appId);
- appCfg.setContent("state.savepoints.dir=hdfs:///test");
+ appCfg.setContent(ckDir);
appCfg.setFormat(ConfigFileTypeEnum.PROPERTIES.getValue());
configService.save(appCfg);
assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
@@ -134,9 +136,10 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase
{
// Test for configured CHECKPOINTING_INTERVAL
appCfg.setContent(
DeflaterUtils.zipString(
- "state.savepoints.dir=hdfs:///test\n"
+ ckDir + "\n"
+ String.format("%s=%s", CHECKPOINTING_INTERVAL.key(),
"3min")));
+
configService.updateById(appCfg);
FlinkEffective effective = new FlinkEffective();
effective.setTargetId(appCfg.getId());
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
new file mode 100644
index 000000000..74c905a8e
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnRemoteClusterDeployTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.RemoteForm;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-remote/docker-compose.yaml")
+public class Flink120OnRemoteClusterDeployTest {
+
+ public static RemoteWebDriver browser;
+
+ private static final String flinkName = "flink-1.20.1";
+
+ private static final String flinkHome = "/opt/flink/";
+
+ private static final String flinkDescription = "description test";
+
+ private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+ private static final String flinkJobManagerUrl = "http://jobmanager:8081";
+
+ private static final ClusterDetailForm.DeployMode deployMode =
ClusterDetailForm.DeployMode.STANDALONE;
+
+ @BeforeAll
+ public static void setUp() {
+ FlinkHomePage flinkHomePage = new LoginPage(browser)
+ .login()
+ .goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkHomePage.class);
+
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+ flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkClustersPage.class);
+ }
+
+ @Test
+ @Order(1)
+ public void testCreateFlinkCluster() {
+ FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);
+
+ flinkClustersPage.createFlinkCluster()
+ .<RemoteForm>addCluster(deployMode)
+ .jobManagerURL(flinkJobManagerUrl)
+ .clusterName(flinkClusterName)
+ .flinkVersion(flinkName)
+ .submit();
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(flinkClustersPage.flinkClusterList)
+ .as("Flink clusters list should contain newly-created
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(flinkClusterName)));
+ }
+
+ @Test
+ @Order(5)
+ public void testDeleteFlinkCluster() {
+ final FlinkClustersPage flinkClustersPage = new
FlinkClustersPage(browser);
+
+ flinkClustersPage.deleteFlinkCluster(flinkClusterName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> {
+ browser.navigate().refresh();
+ Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+ assertThat(flinkClustersPage.flinkClusterList)
+ .noneMatch(it ->
it.getText().contains(flinkClusterName));
+ });
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
new file mode 100644
index 000000000..d6d379cc5
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/Flink120OnYarnClusterDeployTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-yarn/docker-compose.yaml")
+public class Flink120OnYarnClusterDeployTest {
+
+ public static RemoteWebDriver browser;
+
+ private static final String flinkName = "flink-1.20.1";
+
+ private static final String flinkHome = "/flink-1.20.1";
+
+ private static final String flinkDescription = "description test";
+
+ private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+ private static final String flinkClusterNameEdited =
"flink_1.20.1_cluster_e2e_edited";
+
+ private static final ClusterDetailForm.DeployMode deployMode =
ClusterDetailForm.DeployMode.YARN_SESSION;
+
+ @BeforeAll
+ public static void setup() {
+ FlinkHomePage flinkHomePage = new LoginPage(browser)
+ .login()
+ .goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkHomePage.class);
+
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+ flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkClustersPage.class);
+ }
+
+ @Test
+ @Order(1)
+ public void testCreateFlinkCluster() {
+ final FlinkClustersPage flinkClustersPage = new
FlinkClustersPage(browser);
+
+ flinkClustersPage.createFlinkCluster()
+ .<YarnSessionForm>addCluster(deployMode)
+ .resolveOrder(YarnSessionForm.ResolveOrder.CHILD_FIRST)
+ .clusterName(flinkClusterName)
+ .flinkVersion(flinkName)
+ .submit();
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(flinkClustersPage.flinkClusterList)
+ .as("Flink clusters list should contain newly-created
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(flinkClusterName)));
+ }
+
+ @Test
+ @Order(2)
+ public void testEditFlinkCluster() {
+ final FlinkClustersPage flinkClustersPage = new
FlinkClustersPage(browser);
+
+ flinkClustersPage.editFlinkCluster(flinkClusterName)
+ .<YarnSessionForm>addCluster(deployMode)
+ .clusterName(flinkClusterNameEdited)
+ .submit();
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(flinkClustersPage.flinkClusterList)
+ .as("Flink clusters list should contain edited
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(flinkClusterNameEdited)));
+ }
+
+ @Test
+ @Order(3)
+ public void testStartFlinkCluster() {
+ final FlinkClustersPage flinkClustersPage = new
FlinkClustersPage(browser);
+
+ flinkClustersPage.startFlinkCluster(flinkClusterNameEdited);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(flinkClustersPage.flinkClusterList)
+ .as("Flink clusters list should contain running
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+ }
+
+ @Test
+ @Order(4)
+ public void testStopFlinkCluster() {
+ final FlinkClustersPage flinkClustersPage = new
FlinkClustersPage(browser);
+
+ flinkClustersPage.stopFlinkCluster(flinkClusterNameEdited);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(flinkClustersPage.flinkClusterList)
+ .as("Flink clusters list should contain canceled
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("CANCELED")));
+ }
+
+ @Test
+ @Order(5)
+ public void testDeleteFlinkCluster() {
+ final FlinkClustersPage flinkClustersPage = new
FlinkClustersPage(browser);
+
+ flinkClustersPage.deleteFlinkCluster(flinkClusterNameEdited);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> {
+ browser.navigate().refresh();
+ Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+ assertThat(flinkClustersPage.flinkClusterList)
+ .noneMatch(it ->
it.getText().contains(flinkClusterNameEdited));
+ });
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
new file mode 100644
index 000000000..39147c962
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL120OnYarnTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.e2e.cases;
+
+import org.apache.streampark.e2e.core.StreamPark;
+import org.apache.streampark.e2e.pages.LoginPage;
+import org.apache.streampark.e2e.pages.common.Constants;
+import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
+import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
+import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
+import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
+import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
+import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
+import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;
+
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.WebElement;
+import org.openqa.selenium.remote.RemoteWebDriver;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@StreamPark(composeFiles = "docker/flink-1.20-on-yarn/docker-compose.yaml")
+public class FlinkSQL120OnYarnTest {
+
+ public static RemoteWebDriver browser;
+
+ private static final String flinkName = "flink-1.20.1";
+
+ private static final String flinkHome = "/flink-1.20.1";
+
+ private static final String applicationName = "flink-120-e2e-test";
+
+ private static final String flinkDescription = "description test";
+
+ private static final String flinkClusterName = "flink_1.20.1_cluster_e2e";
+
+ @BeforeAll
+ public static void setup() {
+ FlinkHomePage flinkHomePage = new LoginPage(browser)
+ .login()
+ .goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkHomePage.class);
+
+ flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);
+
+ FlinkClustersPage flinkClustersPage =
flinkHomePage.goToNav(ApacheFlinkPage.class)
+ .goToTab(FlinkClustersPage.class);
+
+ flinkClustersPage.createFlinkCluster()
+
.<YarnSessionForm>addCluster(ClusterDetailForm.DeployMode.YARN_SESSION)
+ .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
+ .clusterName(flinkClusterName)
+ .flinkVersion(flinkName)
+ .submit();
+
+ flinkClustersPage.startFlinkCluster(flinkClusterName);
+
+ flinkClustersPage.goToNav(ApacheFlinkPage.class)
+ .goToTab(ApplicationsPage.class);
+ }
+
+ @Test
+ @Order(1)
+ void testCreateFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage
+ .createApplication()
+ .addApplication(
+ ApplicationForm.FlinkJobType.FLINK_SQL,
+ ApplicationForm.DeployMode.YARN_APPLICATION,
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(Constants.TEST_FLINK_SQL)
+ .submit();
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain newly-created
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(2)
+ void testReleaseFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain released
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(3)
+ void testStartFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain finished
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(4)
+ @SneakyThrows
+ void testCancelFlinkApplicationOnYarnApplicationMode() {
+ Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain restarted
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ applicationsPage.cancelApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain canceled
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("CANCELED")));
+ }
+
+ @Test
+ @Order(5)
+ void testDeleteFlinkApplicationOnYarnApplicationMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.deleteApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> {
+ browser.navigate().refresh();
+
+ assertThat(applicationsPage.applicationsList)
+ .noneMatch(it ->
it.getText().contains(applicationName));
+ });
+ }
+
+ @Test
+ @Order(11)
+ void testCreateFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage
+ .createApplication()
+ .addApplication(
+ ApplicationForm.FlinkJobType.FLINK_SQL,
+ ApplicationForm.DeployMode.YARN_SESSION,
+ applicationName)
+ .flinkVersion(flinkName)
+ .flinkSql(Constants.TEST_FLINK_SQL)
+ .flinkCluster(flinkClusterName)
+ .submit();
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain newly-created
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains(applicationName)));
+ }
+
+ @Test
+ @Order(12)
+ void testReleaseFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.releaseApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain released
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("SUCCESS")));
+ }
+
+ @Test
+ @Order(13)
+ void testStartFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain finished
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("FINISHED")));
+ }
+
+ @Test
+ @Order(14)
+ @SneakyThrows
+ void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+ applicationsPage.startApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain restarted
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("RUNNING")));
+
+ applicationsPage.cancelApplication(applicationName);
+
+ Awaitility.await()
+ .untilAsserted(
+ () -> assertThat(applicationsPage.applicationsList)
+ .as("Applications list should contain canceled
application")
+ .extracting(WebElement::getText)
+ .anyMatch(it -> it.contains("CANCELED")));
+ }
+
+ @Test
+ @Order(15)
+ void testDeleteFlinkApplicationOnYarnSessionMode() {
+ final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
+ applicationsPage.deleteApplication(applicationName);
+ Awaitility.await()
+ .untilAsserted(
+ () -> {
+ browser.navigate().refresh();
+
+ assertThat(applicationsPage.applicationsList)
+ .noneMatch(it ->
it.getText().contains(applicationName));
+ });
+ }
+}
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
new file mode 100644
index 000000000..b1e093fd4
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/Dockerfile
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
new file mode 100644
index 000000000..b16486e6b
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-remote/docker-compose.yaml
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+services:
+ jobmanager:
+ image: flink:1.20.1
+ command: jobmanager
+ ports:
+ - "8081:8081"
+ environment:
+ - |
+ FLINK_PROPERTIES=
+ jobmanager.rpc.address: jobmanager
+ networks:
+ - e2e
+ volumes:
+ - flink_data:/opt/flink
+ - /var/run/docker.sock:/var/run/docker.sock
+ healthcheck:
+ test: [ "CMD", "curl", "http://localhost:8081" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+
+ taskmanager:
+ image: flink:1.20.1
+ depends_on:
+ - jobmanager
+ command: taskmanager
+ scale: 1
+ environment:
+ - |
+ FLINK_PROPERTIES=
+ jobmanager.rpc.address: jobmanager
+ taskmanager.numberOfTaskSlots: 2
+ networks:
+ - e2e
+ volumes:
+ - flink_data:/opt/flink
+ - /var/run/docker.sock:/var/run/docker.sock
+ healthcheck:
+ test: [ "CMD", "curl", "http://localhost:8081" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+
+ streampark:
+ image: apache/streampark:ci
+ command: bash bin/streampark.sh start_docker
+ build:
+ context: ./
+ dockerfile: ./Dockerfile
+ ports:
+ - 10000:10000
+ - 10030:10030
+ environment:
+ - SPRING_PROFILES_ACTIVE=h2
+ - TZ=Asia/Shanghai
+ - FLINK_JOBMANAGER_URL=http://jobmanager:8081
+ privileged: true
+ restart: unless-stopped
+ networks:
+ - e2e
+ volumes:
+ - flink_data:/opt/flink
+ - ${HOME}/streampark_build_logs:/tmp/streampark/logs/build_logs/
+ - /var/run/docker.sock:/var/run/docker.sock
+ healthcheck:
+ test: [ "CMD", "curl", "http://localhost:10000" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+networks:
+ e2e:
+volumes:
+ flink_data:
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
new file mode 100644
index 000000000..6b050d220
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/Dockerfile
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM apache/streampark:ci as base-image
+FROM flink:1.20.1-scala_2.12-java8 as flink-image
+
+# Install streampark
+FROM sbloodys/hadoop:3.3.6
+COPY --from=base-image /streampark /streampark
+RUN sudo chown -R hadoop.hadoop /streampark \
+ && sed -i "s/hadoop-user-name: hdfs$/hadoop-user-name: hadoop/g"
/streampark/conf/config.yaml
+
+# Install Flink
+COPY --from=flink-image /opt/flink /flink-1.20.1
+RUN sudo chown -R hadoop.hadoop /flink-1.20.1
+
+# Install javac
+ARG TARGETPLATFORM
+RUN echo "TARGETPLATFORM: $TARGETPLATFORM"
+RUN \
+ if [ "$TARGETPLATFORM" = "linux/arm64" ];then \
+ sudo rm -f /etc/yum.repos.d/*.repo; \
+ sudo wget http://mirrors.aliyun.com/repo/Centos-altarch-7.repo -O
/etc/yum.repos.d/CentOS-Base.repo; \
+ sudo sed -i "s/http:\/\//https:\/\//g"
/etc/yum.repos.d/CentOS-Base.repo; \
+ sudo yum install -y java-1.8.0-openjdk-devel; \
+ elif [ "$TARGETPLATFORM" = "linux/amd64" ];then \
+ sudo yum install -y java-1.8.0-openjdk-devel; \
+ else \
+ echo "unknown TARGETPLATFORM: $TARGETPLATFORM"; \
+ exit 2; \
+ fi
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
new file mode 100644
index 000000000..441fa1118
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.config
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HADOOP_HOME=/opt/hadoop
+CORE-SITE.XML_fs.default.name=hdfs://namenode
+CORE-SITE.XML_fs.defaultFS=hdfs://namenode
+HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
+HDFS-SITE.XML_dfs.replication=1
+MAPRED-SITE.XML_mapreduce.framework.name=yarn
+MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
+YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
+YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
+YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
+YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
+YARN-SITE.XML_yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage=99.9
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
+CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
new file mode 100644
index 000000000..73af8308b
--- /dev/null
+++
b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.20-on-yarn/docker-compose.yaml
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+services:
+ namenode:
+ image: apache/streampark-flink-1.20.1-on-yarn:ci
+ hostname: namenode
+ command: [ "hdfs", "namenode" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "19870:9870"
+ env_file:
+ - docker-compose.config
+ environment:
+ ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://namenode:9870" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ datanode:
+ image: apache/streampark-flink-1.20.1-on-yarn:ci
+ hostname: datanode
+ command: [ "hdfs", "datanode" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://datanode:9864" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ depends_on:
+ namenode:
+ condition: service_healthy
+ resourcemanager:
+ image: apache/streampark-flink-1.20.1-on-yarn:ci
+ hostname: resourcemanager
+ command: [ "yarn", "resourcemanager" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "18088:8088"
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ healthcheck:
+ test: [ "CMD", "curl", "http://resourcemanager:8088" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ nodemanager:
+ image: apache/streampark-flink-1.20.1-on-yarn:ci
+ hostname: nodemanager
+ command: [ "yarn", "nodemanager" ]
+ networks:
+ - e2e
+ build:
+ context: ./
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ depends_on:
+ resourcemanager:
+ condition: service_healthy
+ healthcheck:
+ test: [ "CMD", "curl", "http://nodemanager:8042" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+ streampark:
+ image: apache/streampark-flink-1.20.1-on-yarn:ci
+ hostname: streampark
+ command:
+ - sh
+ - -c
+ - /streampark/bin/streampark.sh start_docker
+ networks:
+ - e2e
+ build:
+ context: ./
+ ports:
+ - "20000:10000"
+ environment:
+ - SPRING_PROFILES_ACTIVE=h2
+ - TZ=Asia/Shanghai
+ env_file:
+ - docker-compose.config
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "200m"
+ max-file: "1"
+ tty: true
+ stdin_open: true
+ restart: always
+ depends_on:
+ namenode:
+ condition: service_healthy
+ datanode:
+ condition: service_healthy
+ resourcemanager:
+ condition: service_healthy
+ nodemanager:
+ condition: service_healthy
+ healthcheck:
+ test: [ "CMD", "curl", "http://streampark:10000" ]
+ interval: 5s
+ timeout: 5s
+ retries: 120
+
+networks:
+ e2e:
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 5e8d6dfd0..d19ba8787 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -30,7 +30,6 @@ import
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
import org.apache.hadoop.fs.{Path => HadoopPath}
import org.apache.hadoop.yarn.api.records.ApplicationId
-import java.io.File
import java.lang.{Boolean => JavaBool}
/** yarn PerJob mode submit */
@@ -60,7 +59,7 @@ object YarnPerJobClient extends YarnClientTrait {
getYarnClusterDeployDescriptor(flinkConfig, submitRequest.hadoopUser)
val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
- clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
+ clusterDescriptor.addShipFiles(List(new HadoopPath(s"$flinkHome/lib")))
var packagedProgram: PackagedProgram = null
val clusterClient = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 21b96b41a..d78517581 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -44,7 +44,7 @@ object KubernetesRetriever extends Logger {
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
- Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
+
Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue().toMillis)
private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]()
diff --git a/streampark-flink/streampark-flink-shims/pom.xml
b/streampark-flink/streampark-flink-shims/pom.xml
index 964fcaca0..16e176448 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -40,6 +40,7 @@
<module>streampark-flink-shims_flink-1.17</module>
<module>streampark-flink-shims_flink-1.18</module>
<module>streampark-flink-shims_flink-1.19</module>
+ <module>streampark-flink-shims_flink-1.20</module>
</modules>
</project>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
index 95131c581..4aa7c87cf 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
@@ -29,7 +29,7 @@
<name>StreamPark : Flink Shims Test</name>
<properties>
- <flink.version>1.18.1</flink.version>
+ <flink.version>1.20.1</flink.version>
</properties>
<dependencies>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/pom.xml
similarity index 52%
copy from
streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
copy to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/pom.xml
index 95131c581..b666ac63c 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/pom.xml
@@ -16,7 +16,7 @@
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -25,69 +25,54 @@
<version>2.2.0-SNAPSHOT</version>
</parent>
-
<artifactId>streampark-flink-shims-test_${scala.binary.version}</artifactId>
- <name>StreamPark : Flink Shims Test</name>
+
<artifactId>streampark-flink-shims_flink-1.20_${scala.binary.version}</artifactId>
+ <name>StreamPark : Flink Shims 1.20</name>
<properties>
- <flink.version>1.18.1</flink.version>
+ <flink.version>1.20.0</flink.version>
</properties>
<dependencies>
-
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.binary.version}</artifactId>
- <version>3.2.9</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-shims_flink-1.14_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
+ <!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-uber</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -99,34 +84,74 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
+ <artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+ <artifactId>flink-yarn</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb</artifactId>
+ <artifactId>flink-kubernetes</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
-
</dependencies>
- <profiles>
- <profile>
- <id>apache-release</id>
- <properties>
- <maven.deploy.skip>true</maven.deploy.skip>
- </properties>
- </profile>
- </profiles>
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+
<createDependencyReducedPom>true</createDependencyReducedPom>
+
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+ <artifactSet>
+ <includes>
+
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..ecfc8dabb
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T])
+ extends FlinkClientTrait[T](clusterClient) {
+
+ override def triggerSavepoint(
+ jobID: JobID,
+ savepointDir: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.triggerSavepoint(
+ jobID,
+ savepointDir,
+ if (nativeFormat) SavepointFormatType.NATIVE
+ else SavepointFormatType.CANONICAL)
+ }
+
+ override def cancelWithSavepoint(
+ jobID: JobID,
+ savepointDirectory: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.cancelWithSavepoint(
+ jobID,
+ savepointDirectory,
+ if (nativeFormat) SavepointFormatType.NATIVE
+ else SavepointFormatType.CANONICAL)
+ }
+
+ override def stopWithSavepoint(
+ jobID: JobID,
+ advanceToEndOfEventTime: Boolean,
+ savepointDirectory: String,
+ nativeFormat: Boolean): CompletableFuture[String] = {
+ clusterClient.stopWithSavepoint(
+ jobID,
+ advanceToEndOfEventTime,
+ savepointDirectory,
+ if (nativeFormat) SavepointFormatType.NATIVE
+ else SavepointFormatType.CANONICAL)
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
new file mode 100644
index 000000000..707ba43f0
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
+
+import java.util.Optional
+
+class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
+ extends FlinkKubernetesClientTrait(kubeClient) {
+
+ override def getService(serviceName: String): Optional[KubernetesService] = {
+ kubeClient.getService(serviceName)
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..8d655abec
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.streampark.common.util.Implicits.JavaList
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet,
StreamTableEnvironment}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+class StreamTableContext(
+ override val parameter: ParameterTool,
+ private val streamEnv: StreamExecutionEnvironment,
+ private val tableEnv: StreamTableEnvironment)
+ extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+ def this(args: (ParameterTool, StreamExecutionEnvironment,
StreamTableEnvironment)) =
+ this(args._1, args._2, args._3)
+
+ def this(args: StreamTableEnvConfig) =
+ this(FlinkTableInitializer.initialize(args))
+
+ override def fromDataStream[T](dataStream: DataStream[T], schema: Schema):
Table =
+ tableEnv.fromDataStream[T](dataStream, schema)
+
+ override def fromChangelogStream(dataStream: DataStream[Row]): Table =
+ tableEnv.fromChangelogStream(dataStream)
+
+ override def fromChangelogStream(dataStream: DataStream[Row], schema:
Schema): Table =
+ tableEnv.fromChangelogStream(dataStream, schema)
+
+ override def fromChangelogStream(
+ dataStream: DataStream[Row],
+ schema: Schema,
+ changelogMode: ChangelogMode): Table =
+ tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+ override def createTemporaryView[T](
+ path: String,
+ dataStream: DataStream[T],
+ schema: Schema): Unit =
+ tableEnv.createTemporaryView[T](path, dataStream, schema)
+
+ override def toDataStream(table: Table): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream(table)
+ }
+
+ override def toDataStream[T](table: Table, targetClass: Class[T]):
DataStream[T] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream[T](table, targetClass)
+ }
+
+ override def toDataStream[T](table: Table, targetDataType:
AbstractDataType[_]): DataStream[T] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream[T](table, targetDataType)
+ }
+
+ override def toChangelogStream(table: Table): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table)
+ }
+
+ override def toChangelogStream(table: Table, targetSchema: Schema):
DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table, targetSchema)
+ }
+
+ override def toChangelogStream(
+ table: Table,
+ targetSchema: Schema,
+ changelogMode: ChangelogMode): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+ }
+
+ override def createStatementSet(): StreamStatementSet =
+ tableEnv.createStatementSet()
+
+ override def useModules(strings: String*): Unit =
+ tableEnv.useModules(strings: _*)
+
+ override def createTemporaryTable(path: String, descriptor:
TableDescriptor): Unit =
+ tableEnv.createTemporaryTable(path, descriptor)
+
+ override def createTable(path: String, descriptor: TableDescriptor): Unit =
+ tableEnv.createTable(path, descriptor)
+
+ override def from(descriptor: TableDescriptor): Table =
+ tableEnv.from(descriptor)
+
+ override def listFullModules(): Array[ModuleEntry] =
+ tableEnv.listFullModules()
+
+ /** @since 1.15 */
+ override def listTables(s: String, s1: String): Array[String] =
+ tableEnv.listTables(s, s1)
+
+ /** @since 1.15 */
+ override def loadPlan(planReference: PlanReference): CompiledPlan =
+ tableEnv.loadPlan(planReference)
+
+ /** @since 1.15 */
+ override def compilePlanSql(s: String): CompiledPlan =
+ tableEnv.compilePlanSql(s)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JavaList[ResourceUri]): Unit =
+ tableEnv.createFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JavaList[ResourceUri],
+ ignoreIfExists: Boolean): Unit =
+ tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+ /** @since 1.17 */
+ override def createTemporaryFunction(
+ path: String,
+ className: String,
+ resourceUris: JavaList[ResourceUri]): Unit =
+ tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createTemporarySystemFunction(
+ name: String,
+ className: String,
+ resourceUris: JavaList[ResourceUri]): Unit =
+ tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+ /** @since 1.17 */
+ override def explainSql(
+ statement: String,
+ format: ExplainFormat,
+ extraDetails: ExplainDetail*): String =
+ tableEnv.explainSql(statement, format, extraDetails: _*)
+
+ /** @since 1.18 */
+ override def createCatalog(catalog: String, catalogDescriptor:
CatalogDescriptor): Unit = {
+ tableEnv.createCatalog(catalog, catalogDescriptor)
+ }
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..9d295b6b7
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.streampark.common.util.Implicits.JavaList
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api._
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+
+class TableContext(override val parameter: ParameterTool, private val
tableEnv: TableEnvironment)
+ extends FlinkTableTrait(parameter, tableEnv) {
+
+ def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+ def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+ override def useModules(strings: String*): Unit =
+ tableEnv.useModules(strings: _*)
+
+ override def createTemporaryTable(path: String, descriptor:
TableDescriptor): Unit = {
+ tableEnv.createTemporaryTable(path, descriptor)
+ }
+
+ override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+ tableEnv.createTable(path, descriptor)
+ }
+
+ override def from(tableDescriptor: TableDescriptor): Table = {
+ tableEnv.from(tableDescriptor)
+ }
+
+ override def listFullModules(): Array[ModuleEntry] =
+ tableEnv.listFullModules()
+
+ /** @since 1.15 */
+ override def listTables(catalogName: String, databaseName: String):
Array[String] =
+ tableEnv.listTables(catalogName, databaseName)
+
+ /** @since 1.15 */
+ override def loadPlan(planReference: PlanReference): CompiledPlan =
+ tableEnv.loadPlan(planReference)
+
+ /** @since 1.15 */
+ override def compilePlanSql(stmt: String): CompiledPlan =
+ tableEnv.compilePlanSql(stmt)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JavaList[ResourceUri]): Unit =
+ tableEnv.createFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JavaList[ResourceUri],
+ ignoreIfExists: Boolean): Unit =
+ tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+ /** @since 1.17 */
+ override def createTemporaryFunction(
+ path: String,
+ className: String,
+ resourceUris: JavaList[ResourceUri]): Unit =
+ tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createTemporarySystemFunction(
+ name: String,
+ className: String,
+ resourceUris: JavaList[ResourceUri]): Unit =
+ tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+ /** @since 1.17 */
+ override def explainSql(
+ statement: String,
+ format: ExplainFormat,
+ extraDetails: ExplainDetail*): String =
+ tableEnv.explainSql(statement, format, extraDetails: _*)
+
+ /** @since 1.18 */
+ override def createCatalog(catalog: String, catalogDescriptor:
CatalogDescriptor): Unit = {
+ tableEnv.createCatalog(catalog, catalogDescriptor)
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
new file mode 100644
index 000000000..1e36742e2
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.20/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.table.api.bridge.scala.{TableConversions =>
FlinkTableConversions}
+import org.apache.flink.types.Row
+
+object TableExt {
+
+ class Table(val table: FlinkTable) {
+ def ->(field: String, fields: String*): FlinkTable =
+ table.as(field, fields: _*)
+ }
+
+ class TableConversions(table: FlinkTable) extends
FlinkTableConversions(table) {
+
+ def \\ : DataStream[Row] = toDataStream
+
+ def >>[T: TypeInformation](implicit context: StreamTableContext):
DataStream[T] = {
+ context.isConvertedToDataStream = true
+ super.toAppendStream
+ }
+ }
+
+}
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index a5e32e9a5..9d4e93b54 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -71,28 +71,32 @@ enumeratum-macros_2.12-1.6.1.jar
enumeratum_2.12-1.6.1.jar
failureaccess-1.0.1.jar
flatbuffers-java-1.9.0.jar
-flink-annotations-1.18.1.jar
+flink-annotations-1.20.1.jar
flink-cdc-cli-3.2.1.jar
flink-cdc-common-3.2.1.jar
flink-cdc-composer-3.2.1.jar
flink-cdc-runtime-3.2.1.jar
-flink-clients-1.18.1.jar
-flink-core-1.18.1.jar
-flink-hadoop-fs-1.18.1.jar
-flink-java-1.18.1.jar
-flink-kubernetes-1.18.1.jar
-flink-metrics-core-1.18.1.jar
-flink-optimizer-1.18.1.jar
-flink-rpc-akka-loader-1.18.1.jar
-flink-rpc-core-1.18.1.jar
-flink-runtime-1.18.1.jar
+flink-clients-1.20.1.jar
+flink-connector-datagen-1.20.1.jar
+flink-core-1.20.1.jar
+flink-core-api-1.20.1.jar
+flink-datastream-1.20.1.jar
+flink-datastream-api-1.20.1.jar
+flink-hadoop-fs-1.20.1.jar
+flink-java-1.20.1.jar
+flink-kubernetes-1.20.1.jar
+flink-metrics-core-1.20.1.jar
+flink-optimizer-1.20.1.jar
+flink-rpc-akka-loader-1.20.1.jar
+flink-rpc-core-1.20.1.jar
+flink-runtime-1.20.1.jar
flink-shaded-asm-9-9.5-17.0.jar
flink-shaded-guava-31.1-jre-17.0.jar
flink-shaded-jackson-2.14.2-17.0.jar
flink-shaded-netty-4.1.91.Final-17.0.jar
-flink-streaming-java-1.18.1.jar
-flink-table-common-1.18.1.jar
-flink-table-api-java-1.18.1.jar
+flink-streaming-java-1.20.1.jar
+flink-table-api-java-1.20.1.jar
+flink-table-common-1.20.1.jar
freemarker-2.3.30.jar
gson-2.9.1.jar
guava-30.0-jre.jar
@@ -391,4 +395,5 @@ xz-1.5.jar
zookeeper-3.6.3.jar
icu4j-67.1.jar
zookeeper-jute-3.6.3.jar
+async-profiler-2.9.jar
netty-transport-native-epoll-4.1.91.Final.jar