This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch flink-119 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit a39950c2ed2734c944a6686de16dbf2bb6d58003 Author: benjobs <[email protected]> AuthorDate: Fri Apr 19 15:17:34 2024 +0800 [Improve] apache flink 1.19 support --- .../streampark/common/conf/FlinkVersion.scala | 2 +- .../streampark-console-service/pom.xml | 7 + .../streampark/console/core/entity/FlinkEnv.java | 26 +++- .../console/core/runner/EnvInitializer.java | 2 +- .../core/service/impl/FlinkEnvServiceImpl.java | 2 +- streampark-flink/streampark-flink-shims/pom.xml | 1 + .../streampark-flink-shims_flink-1.19/pom.xml | 154 ++++++++++++++++++++ .../streampark/flink/core/FlinkClusterClient.scala | 49 +++++++ .../flink/core/FlinkKubernetesClient.scala | 31 ++++ .../streampark/flink/core/StreamTableContext.scala | 161 +++++++++++++++++++++ .../streampark/flink/core/TableContext.scala | 103 +++++++++++++ .../apache/streampark/flink/core/TableExt.scala | 42 ++++++ 12 files changed, 575 insertions(+), 5 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala index b6e5e6a90..6dec3b29b 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala @@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg def checkVersion(throwException: Boolean = true): Boolean = { version.split("\\.").map(_.trim.toInt) match { - case Array(1, v, _) if v >= 12 && v <= 18 => true + case Array(1, v, _) if v >= 12 && v <= 19 => true case _ => if (throwException) { throw new UnsupportedOperationException(s"Unsupported flink version: $version") diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index 2d7a7a520..aa85f698f 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -618,6 +618,13 @@ <version>${project.version}</version> <outputDirectory>${project.build.directory}/shims</outputDirectory> </dependency> + <!-- flink 1.19 support--> + <dependency> + <groupId>org.apache.streampark</groupId> + <artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <outputDirectory>${project.build.directory}/shims</outputDirectory> + </dependency> <!-- flink-submit-core --> <dependency> <groupId>org.apache.streampark</groupId> diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java index 6decd5de5..b1ed53da8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java @@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.conf.FlinkVersion; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApiDetailException; import org.apache.commons.io.FileUtils; @@ -33,6 +34,7 @@ import lombok.Setter; import java.io.File; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.Map; import java.util.Properties; @@ -67,9 +69,29 @@ public class FlinkEnv implements Serializable { private transient String streamParkScalaVersion = scala.util.Properties.versionNumberString(); public void doSetFlinkConf() throws ApiDetailException { + File yaml; + float ver = Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle())); + if (ver < 1.19f) { + yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); + if (!yaml.exists()) { + throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf "); + } + } else if (ver == 1.19f) { + yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); + if (!yaml.exists()) { + yaml = new File(this.flinkHome.concat("/conf/config.yaml")); + } + if (!yaml.exists()) { + throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml in flink/conf "); + } + } else { + yaml = new File(this.flinkHome.concat("/conf/config.yaml")); + if (!yaml.exists()) { + throw new ApiAlertException("cannot find config.yaml in flink/conf "); + } + } try { - File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); - String flinkConf = FileUtils.readFileToString(yaml); + String flinkConf = FileUtils.readFileToString(yaml, StandardCharsets.UTF_8); this.flinkConf = DeflaterUtils.zipString(flinkConf); } catch (Exception e) { throw new ApiDetailException(e); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index 841f954fa..0e87a9361 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -68,7 +68,7 @@ public class EnvInitializer implements ApplicationRunner { private static final Pattern PATTERN_FLINK_SHIMS_JAR = Pattern.compile( - "^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$", + "^streampark-flink-shims_flink-(1.1[2-9])_(2.11|2.12)-(.*).jar$", Pattern.CASE_INSENSITIVE | Pattern.DOTALL); @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java index e83bd65bb..5a53e587e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java @@ -84,8 +84,8 @@ public class FlinkEnvServiceImpl extends ServiceImpl<FlinkEnvMapper, FlinkEnv> long count = this.baseMapper.selectCount(null); version.setIsDefault(count == 0); version.setCreateTime(new Date()); - version.doSetFlinkConf(); version.doSetVersion(); + version.doSetFlinkConf(); return save(version); } diff --git a/streampark-flink/streampark-flink-shims/pom.xml b/streampark-flink/streampark-flink-shims/pom.xml index 587c5bd4e..1b53bef49 100644 --- a/streampark-flink/streampark-flink-shims/pom.xml +++ b/streampark-flink/streampark-flink-shims/pom.xml @@ -45,6 +45,7 @@ <module>streampark-flink-shims_flink-1.16</module> <module>streampark-flink-shims_flink-1.17</module> <module>streampark-flink-shims_flink-1.18</module> + <module>streampark-flink-shims_flink-1.19</module> </modules> </profile> </profiles> diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml new file mode 100644 index 000000000..2e0751ecb --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml @@ -0,0 +1,154 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.streampark</groupId> + <artifactId>streampark-flink-shims</artifactId> + <version>2.1.4</version> + </parent> + + <artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId> + <name>StreamPark : Flink Shims 1.19</name> + + <properties> + <flink.version>1.19.0</flink.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.streampark</groupId> + <artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!--flink--> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-uber</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-yarn</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-api</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-runtime</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-kubernetes</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation> + <artifactSet> + <includes> + <include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala new file mode 100644 index 000000000..4f6336f5a --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.flink.core + +import org.apache.flink.api.common.JobID +import org.apache.flink.client.program.ClusterClient +import org.apache.flink.core.execution.SavepointFormatType + +import java.util.concurrent.CompletableFuture + +class FlinkClusterClient[T](clusterClient: ClusterClient[T]) + extends FlinkClientTrait[T](clusterClient) { + + override def triggerSavepoint(jobID: JobID, savepointDir: String): CompletableFuture[String] = { + clusterClient.triggerSavepoint(jobID, savepointDir, SavepointFormatType.DEFAULT) + } + + override def cancelWithSavepoint( + jobID: JobID, + savepointDirectory: String): CompletableFuture[String] = { + clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT) + } + + override def stopWithSavepoint( + jobID: JobID, + advanceToEndOfEventTime: Boolean, + savepointDirectory: String): CompletableFuture[String] = { + clusterClient.stopWithSavepoint( + jobID, + advanceToEndOfEventTime, + savepointDirectory, + SavepointFormatType.DEFAULT) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala new file mode 100644 index 000000000..f388c8e9f --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.flink.core + +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService + +import java.util.Optional + +class FlinkKubernetesClient(kubeClient: FlinkKubeClient) + extends FlinkKubernetesClientTrait(kubeClient) { + + override def getService(serviceName: String): Optional[KubernetesService] = { + kubeClient.getService(serviceName) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala new file mode 100644 index 000000000..65f715c75 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Schema, Table, TableDescriptor, TableResult} +import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment} +import org.apache.flink.table.catalog.CatalogDescriptor +import org.apache.flink.table.connector.ChangelogMode +import org.apache.flink.table.module.ModuleEntry +import org.apache.flink.table.resource.ResourceUri +import org.apache.flink.table.types.AbstractDataType +import org.apache.flink.types.Row + +import java.util.{List => JList} + +class StreamTableContext( + override val parameter: ParameterTool, + private val streamEnv: StreamExecutionEnvironment, + private val tableEnv: StreamTableEnvironment) + extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) { + + def this(args: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment)) = + this(args._1, args._2, args._3) + + def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args)) + + override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = + tableEnv.fromDataStream[T](dataStream, schema) + + override def fromChangelogStream(dataStream: DataStream[Row]): Table = + tableEnv.fromChangelogStream(dataStream) + + override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table = + tableEnv.fromChangelogStream(dataStream, schema) + + override def fromChangelogStream( + dataStream: DataStream[Row], + schema: Schema, + changelogMode: ChangelogMode): Table = + tableEnv.fromChangelogStream(dataStream, schema, changelogMode) + + override def createTemporaryView[T]( + path: String, + dataStream: DataStream[T], + schema: Schema): Unit = tableEnv.createTemporaryView[T](path, dataStream, schema) + + override def toDataStream(table: Table): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toDataStream(table) + } + + override def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T] = { + isConvertedToDataStream = true + tableEnv.toDataStream[T](table, targetClass) + } + + override def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T] = { + isConvertedToDataStream = true + tableEnv.toDataStream[T](table, targetDataType) + } + + override def toChangelogStream(table: Table): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table) + } + + override def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table, targetSchema) + } + + override def toChangelogStream( + table: Table, + targetSchema: Schema, + changelogMode: ChangelogMode): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table, targetSchema, changelogMode) + } + + override def createStatementSet(): StreamStatementSet = tableEnv.createStatementSet() + + override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*) + + override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = + tableEnv.createTemporaryTable(path, descriptor) + + override def createTable(path: String, descriptor: TableDescriptor): Unit = + tableEnv.createTable(path, descriptor) + + override def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor) + + override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules() + + /** @since 1.15 */ + override def listTables(s: String, s1: String): Array[String] = tableEnv.listTables(s, s1) + + /** @since 1.15 */ + override def loadPlan(planReference: PlanReference): CompiledPlan = + tableEnv.loadPlan(planReference) + + /** @since 1.15 */ + override def compilePlanSql(s: String): CompiledPlan = tableEnv.compilePlanSql(s) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri], + ignoreIfExists: Boolean): Unit = + tableEnv.createFunction(path, className, resourceUris, ignoreIfExists) + + /** @since 1.17 */ + override def createTemporaryFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporaryFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createTemporarySystemFunction( + name: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporarySystemFunction(name, className, resourceUris) + + /** @since 1.17 */ + override def explainSql( + statement: String, + format: ExplainFormat, + extraDetails: ExplainDetail*): String = + tableEnv.explainSql(statement, format, extraDetails: _*) + + /** @since 1.18 */ + override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = { + tableEnv.createCatalog(catalog, catalogDescriptor) + } +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala new file mode 100644 index 000000000..e8f704f39 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Table, TableDescriptor, TableEnvironment, TableResult} +import org.apache.flink.table.catalog.CatalogDescriptor +import org.apache.flink.table.module.ModuleEntry +import org.apache.flink.table.resource.ResourceUri + +import java.util.{List => JList} + +class TableContext(override val parameter: ParameterTool, private val tableEnv: TableEnvironment) + extends FlinkTableTrait(parameter, tableEnv) { + + def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2) + + def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args)) + + override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*) + + override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = { + tableEnv.createTemporaryTable(path, descriptor) + } + + override def createTable(path: String, descriptor: TableDescriptor): Unit = { + tableEnv.createTable(path, descriptor) + } + + override def from(tableDescriptor: TableDescriptor): Table = { + tableEnv.from(tableDescriptor) + } + + override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules() + + /** @since 1.15 */ + override def listTables(catalogName: String, databaseName: String): Array[String] = + tableEnv.listTables(catalogName, databaseName) + + /** @since 1.15 */ + override def loadPlan(planReference: PlanReference): CompiledPlan = + tableEnv.loadPlan(planReference) + + /** @since 1.15 */ + override def compilePlanSql(stmt: String): CompiledPlan = tableEnv.compilePlanSql(stmt) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri], + ignoreIfExists: Boolean): Unit = + tableEnv.createFunction(path, className, resourceUris, ignoreIfExists) + + /** @since 1.17 */ + override def createTemporaryFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporaryFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createTemporarySystemFunction( + name: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporarySystemFunction(name, className, resourceUris) + + /** @since 1.17 */ + override def explainSql( + statement: String, + format: ExplainFormat, + extraDetails: ExplainDetail*): String = + tableEnv.explainSql(statement, format, extraDetails: _*) + + /** @since 1.18 */ + override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = { + tableEnv.createCatalog(catalog, catalogDescriptor) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala new file mode 100644 index 000000000..cab368e36 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.table.api.{Table => FlinkTable} +import org.apache.flink.table.api.bridge.scala.{TableConversions => FlinkTableConversions} +import org.apache.flink.types.Row + +object TableExt { + + class Table(val table: FlinkTable) { + def ->(field: String, fields: String*): FlinkTable = table.as(field, fields: _*) + } + + class TableConversions(table: FlinkTable) extends FlinkTableConversions(table) { + + def \\ : DataStream[Row] = toDataStream + + def >>[T: TypeInformation](implicit context: StreamTableContext): DataStream[T] = { + context.isConvertedToDataStream = true + super.toAppendStream + } + } + +}
