This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new b13a3f095 [Feature] apache flink 1.19 support (#3673)
b13a3f095 is described below
commit b13a3f095e852600dc2a127095d2a3a179881d8e
Author: benjobs <[email protected]>
AuthorDate: Sat Apr 20 00:19:02 2024 +0800
[Feature] apache flink 1.19 support (#3673)
* [Improve] apache flink 1.19 support
* [Improve] 2.1.4 upgrade sql bug fixed.
---------
Co-authored-by: benjobs <[email protected]>
---
.../streampark/common/conf/FlinkVersion.scala | 2 +-
.../streampark-console-service/pom.xml | 7 +
.../main/assembly/script/upgrade/mysql/2.1.4.sql | 2 +-
.../main/assembly/script/upgrade/pgsql/2.1.4.sql | 2 +-
.../streampark/console/core/entity/FlinkEnv.java | 26 +++-
.../console/core/runner/EnvInitializer.java | 2 +-
.../core/service/impl/FlinkEnvServiceImpl.java | 2 +-
streampark-flink/streampark-flink-shims/pom.xml | 1 +
.../streampark-flink-shims_flink-1.19/pom.xml | 154 ++++++++++++++++++++
.../streampark/flink/core/FlinkClusterClient.scala | 49 +++++++
.../flink/core/FlinkKubernetesClient.scala | 23 ++-
.../streampark/flink/core/StreamTableContext.scala | 161 +++++++++++++++++++++
.../streampark/flink/core/TableContext.scala | 103 +++++++++++++
.../apache/streampark/flink/core/TableExt.scala | 32 ++--
14 files changed, 534 insertions(+), 32 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index b6e5e6a90..6dec3b29b 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
- case Array(1, v, _) if v >= 12 && v <= 18 => true
+ case Array(1, v, _) if v >= 12 && v <= 19 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported flink version:
$version")
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index 2d7a7a520..aa85f698f 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -618,6 +618,13 @@
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
+ <!-- flink 1.19 support-->
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+
<outputDirectory>${project.build.directory}/shims</outputDirectory>
+ </dependency>
<!-- flink-submit-core -->
<dependency>
<groupId>org.apache.streampark</groupId>
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
index 04b7f6ea4..33607f396 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
@@ -27,7 +27,7 @@ SET a.`flink_cluster_id` = c.`id`;
UPDATE `t_flink_app`
SET `cluster_id` = `app_id`
-WHERE `execution_mode` IN (2,3,5);
+WHERE `execution_mode` IN (2,3,4);
ALTER TABLE `t_flink_app` DROP COLUMN `app_id`;
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
index 6b216e6ec..6e6668214 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
@@ -23,6 +23,6 @@ WHERE t_flink_app.cluster_id = t_flink_cluster.cluster_id
UPDATE t_flink_app
SET cluster_id = app_id
-WHERE execution_mode IN (2,3,5);
+WHERE execution_mode IN (2,3,4);
ALTER TABLE t_flink_app DROP COLUMN app_id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index 6decd5de5..b1ed53da8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.commons.io.FileUtils;
@@ -33,6 +34,7 @@ import lombok.Setter;
import java.io.File;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
@@ -67,9 +69,29 @@ public class FlinkEnv implements Serializable {
private transient String streamParkScalaVersion =
scala.util.Properties.versionNumberString();
public void doSetFlinkConf() throws ApiDetailException {
+ File yaml;
+ float ver =
Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
+ if (ver < 1.19f) {
+ yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf
");
+ }
+ } else if (ver == 1.19f) {
+ yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+ if (!yaml.exists()) {
+ yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+ }
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml
in flink/conf ");
+ }
+ } else {
+ yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find config.yaml in flink/conf ");
+ }
+ }
try {
- File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
- String flinkConf = FileUtils.readFileToString(yaml);
+ String flinkConf = FileUtils.readFileToString(yaml,
StandardCharsets.UTF_8);
this.flinkConf = DeflaterUtils.zipString(flinkConf);
} catch (Exception e) {
throw new ApiDetailException(e);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 841f954fa..0e87a9361 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -68,7 +68,7 @@ public class EnvInitializer implements ApplicationRunner {
private static final Pattern PATTERN_FLINK_SHIMS_JAR =
Pattern.compile(
- "^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$",
+ "^streampark-flink-shims_flink-(1.1[2-9])_(2.11|2.12)-(.*).jar$",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index e83bd65bb..5a53e587e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -84,8 +84,8 @@ public class FlinkEnvServiceImpl extends
ServiceImpl<FlinkEnvMapper, FlinkEnv>
long count = this.baseMapper.selectCount(null);
version.setIsDefault(count == 0);
version.setCreateTime(new Date());
- version.doSetFlinkConf();
version.doSetVersion();
+ version.doSetFlinkConf();
return save(version);
}
diff --git a/streampark-flink/streampark-flink-shims/pom.xml
b/streampark-flink/streampark-flink-shims/pom.xml
index 587c5bd4e..1b53bef49 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -45,6 +45,7 @@
<module>streampark-flink-shims_flink-1.16</module>
<module>streampark-flink-shims_flink-1.17</module>
<module>streampark-flink-shims_flink-1.18</module>
+ <module>streampark-flink-shims_flink-1.19</module>
</modules>
</profile>
</profiles>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
new file mode 100644
index 000000000..2e0751ecb
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-shims</artifactId>
+ <version>2.1.4</version>
+ </parent>
+
+
<artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId>
+ <name>StreamPark : Flink Shims 1.19</name>
+
+ <properties>
+ <flink.version>1.19.0</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!--flink-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-uber</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-yarn</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<createDependencyReducedPom>true</createDependencyReducedPom>
+
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+ <artifactSet>
+ <includes>
+
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..4f6336f5a
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T])
+ extends FlinkClientTrait[T](clusterClient) {
+
+ override def triggerSavepoint(jobID: JobID, savepointDir: String):
CompletableFuture[String] = {
+ clusterClient.triggerSavepoint(jobID, savepointDir,
SavepointFormatType.DEFAULT)
+ }
+
+ override def cancelWithSavepoint(
+ jobID: JobID,
+ savepointDirectory: String): CompletableFuture[String] = {
+ clusterClient.cancelWithSavepoint(jobID, savepointDirectory,
SavepointFormatType.DEFAULT)
+ }
+
+ override def stopWithSavepoint(
+ jobID: JobID,
+ advanceToEndOfEventTime: Boolean,
+ savepointDirectory: String): CompletableFuture[String] = {
+ clusterClient.stopWithSavepoint(
+ jobID,
+ advanceToEndOfEventTime,
+ savepointDirectory,
+ SavepointFormatType.DEFAULT)
+ }
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
similarity index 64%
copy from
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
copy to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
index 04b7f6ea4..f388c8e9f 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
@@ -14,21 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.streampark.flink.core
-use streampark;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
-SET NAMES utf8mb4;
-SET foreign_key_checks = 0;
+import java.util.Optional
-UPDATE `t_flink_app` a INNER JOIN `t_flink_cluster` c
-ON a.`cluster_id` = c.`cluster_id`
-AND a.`execution_mode` = 5
-SET a.`flink_cluster_id` = c.`id`;
+class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
+ extends FlinkKubernetesClientTrait(kubeClient) {
-UPDATE `t_flink_app`
-SET `cluster_id` = `app_id`
-WHERE `execution_mode` IN (2,3,5);
+ override def getService(serviceName: String): Optional[KubernetesService] = {
+ kubeClient.getService(serviceName)
+ }
-ALTER TABLE `t_flink_app` DROP COLUMN `app_id`;
-
-SET foreign_key_checks = 1;
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..65f715c75
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat,
PlanReference, Schema, Table, TableDescriptor, TableResult}
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet,
StreamTableEnvironment}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+import java.util.{List => JList}
+
+class StreamTableContext(
+ override val parameter: ParameterTool,
+ private val streamEnv: StreamExecutionEnvironment,
+ private val tableEnv: StreamTableEnvironment)
+ extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+ def this(args: (ParameterTool, StreamExecutionEnvironment,
StreamTableEnvironment)) =
+ this(args._1, args._2, args._3)
+
+ def this(args: StreamTableEnvConfig) =
this(FlinkTableInitializer.initialize(args))
+
+ override def fromDataStream[T](dataStream: DataStream[T], schema: Schema):
Table =
+ tableEnv.fromDataStream[T](dataStream, schema)
+
+ override def fromChangelogStream(dataStream: DataStream[Row]): Table =
+ tableEnv.fromChangelogStream(dataStream)
+
+ override def fromChangelogStream(dataStream: DataStream[Row], schema:
Schema): Table =
+ tableEnv.fromChangelogStream(dataStream, schema)
+
+ override def fromChangelogStream(
+ dataStream: DataStream[Row],
+ schema: Schema,
+ changelogMode: ChangelogMode): Table =
+ tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+ override def createTemporaryView[T](
+ path: String,
+ dataStream: DataStream[T],
+ schema: Schema): Unit = tableEnv.createTemporaryView[T](path,
dataStream, schema)
+
+ override def toDataStream(table: Table): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream(table)
+ }
+
+ override def toDataStream[T](table: Table, targetClass: Class[T]):
DataStream[T] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream[T](table, targetClass)
+ }
+
+ override def toDataStream[T](table: Table, targetDataType:
AbstractDataType[_]): DataStream[T] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream[T](table, targetDataType)
+ }
+
+ override def toChangelogStream(table: Table): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table)
+ }
+
+ override def toChangelogStream(table: Table, targetSchema: Schema):
DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table, targetSchema)
+ }
+
+ override def toChangelogStream(
+ table: Table,
+ targetSchema: Schema,
+ changelogMode: ChangelogMode): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+ }
+
+ override def createStatementSet(): StreamStatementSet =
tableEnv.createStatementSet()
+
+ override def useModules(strings: String*): Unit =
tableEnv.useModules(strings: _*)
+
+ override def createTemporaryTable(path: String, descriptor:
TableDescriptor): Unit =
+ tableEnv.createTemporaryTable(path, descriptor)
+
+ override def createTable(path: String, descriptor: TableDescriptor): Unit =
+ tableEnv.createTable(path, descriptor)
+
+ override def from(descriptor: TableDescriptor): Table =
tableEnv.from(descriptor)
+
+ override def listFullModules(): Array[ModuleEntry] =
tableEnv.listFullModules()
+
+ /** @since 1.15 */
+ override def listTables(s: String, s1: String): Array[String] =
tableEnv.listTables(s, s1)
+
+ /** @since 1.15 */
+ override def loadPlan(planReference: PlanReference): CompiledPlan =
+ tableEnv.loadPlan(planReference)
+
+ /** @since 1.15 */
+ override def compilePlanSql(s: String): CompiledPlan =
tableEnv.compilePlanSql(s)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri],
+ ignoreIfExists: Boolean): Unit =
+ tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+ /** @since 1.17 */
+ override def createTemporaryFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createTemporarySystemFunction(
+ name: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+ /** @since 1.17 */
+ override def explainSql(
+ statement: String,
+ format: ExplainFormat,
+ extraDetails: ExplainDetail*): String =
+ tableEnv.explainSql(statement, format, extraDetails: _*)
+
+ /** @since 1.18 */
+ override def createCatalog(catalog: String, catalogDescriptor:
CatalogDescriptor): Unit = {
+ tableEnv.createCatalog(catalog, catalogDescriptor)
+ }
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..e8f704f39
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat,
PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+
+import java.util.{List => JList}
+
+class TableContext(override val parameter: ParameterTool, private val
tableEnv: TableEnvironment)
+ extends FlinkTableTrait(parameter, tableEnv) {
+
+ def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+ def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+ override def useModules(strings: String*): Unit =
tableEnv.useModules(strings: _*)
+
+ override def createTemporaryTable(path: String, descriptor:
TableDescriptor): Unit = {
+ tableEnv.createTemporaryTable(path, descriptor)
+ }
+
+ override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+ tableEnv.createTable(path, descriptor)
+ }
+
+ override def from(tableDescriptor: TableDescriptor): Table = {
+ tableEnv.from(tableDescriptor)
+ }
+
+ override def listFullModules(): Array[ModuleEntry] =
tableEnv.listFullModules()
+
+ /** @since 1.15 */
+ override def listTables(catalogName: String, databaseName: String):
Array[String] =
+ tableEnv.listTables(catalogName, databaseName)
+
+ /** @since 1.15 */
+ override def loadPlan(planReference: PlanReference): CompiledPlan =
+ tableEnv.loadPlan(planReference)
+
+ /** @since 1.15 */
+ override def compilePlanSql(stmt: String): CompiledPlan =
tableEnv.compilePlanSql(stmt)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri],
+ ignoreIfExists: Boolean): Unit =
+ tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+ /** @since 1.17 */
+ override def createTemporaryFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createTemporarySystemFunction(
+ name: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+ /** @since 1.17 */
+ override def explainSql(
+ statement: String,
+ format: ExplainFormat,
+ extraDetails: ExplainDetail*): String =
+ tableEnv.explainSql(statement, format, extraDetails: _*)
+
+ /** @since 1.18 */
+ override def createCatalog(catalog: String, catalogDescriptor:
CatalogDescriptor): Unit = {
+ tableEnv.createCatalog(catalog, catalogDescriptor)
+ }
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
similarity index 50%
copy from
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
copy to
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
index 04b7f6ea4..cab368e36 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -15,20 +15,28 @@
* limitations under the License.
*/
-use streampark;
+package org.apache.streampark.flink.core
-SET NAMES utf8mb4;
-SET foreign_key_checks = 0;
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.table.api.bridge.scala.{TableConversions =>
FlinkTableConversions}
+import org.apache.flink.types.Row
-UPDATE `t_flink_app` a INNER JOIN `t_flink_cluster` c
-ON a.`cluster_id` = c.`cluster_id`
-AND a.`execution_mode` = 5
-SET a.`flink_cluster_id` = c.`id`;
+object TableExt {
-UPDATE `t_flink_app`
-SET `cluster_id` = `app_id`
-WHERE `execution_mode` IN (2,3,5);
+ class Table(val table: FlinkTable) {
+ def ->(field: String, fields: String*): FlinkTable = table.as(field,
fields: _*)
+ }
-ALTER TABLE `t_flink_app` DROP COLUMN `app_id`;
+ class TableConversions(table: FlinkTable) extends
FlinkTableConversions(table) {
-SET foreign_key_checks = 1;
+ def \\ : DataStream[Row] = toDataStream
+
+ def >>[T: TypeInformation](implicit context: StreamTableContext):
DataStream[T] = {
+ context.isConvertedToDataStream = true
+ super.toAppendStream
+ }
+ }
+
+}