This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new abec8c849 [Feature] dev-2.1.2 Support Flink 1.18 (#3279)
abec8c849 is described below
commit abec8c849522aa12283ad943079713576f913690
Author: ChengJie1053 <[email protected]>
AuthorDate: Wed Oct 25 11:11:52 2023 +0800
[Feature] dev-2.1.2 Support Flink 1.18 (#3279)
---
.../streampark/common/conf/FlinkVersion.scala | 2 +-
.../console/core/runner/EnvInitializer.java | 2 +-
streampark-flink/streampark-flink-shims/pom.xml | 1 +
.../streampark/flink/core/FlinkSqlValidator.scala | 9 +-
.../streampark-flink-shims_flink-1.18/pom.xml | 154 ++++++++++++++++++++
.../streampark/flink/core/FlinkClusterClient.scala | 49 +++++++
.../flink/core/FlinkKubernetesClient.scala | 31 ++++
.../streampark/flink/core/StreamTableContext.scala | 161 +++++++++++++++++++++
.../streampark/flink/core/TableContext.scala | 103 +++++++++++++
.../apache/streampark/flink/core/TableExt.scala | 42 ++++++
10 files changed, 551 insertions(+), 3 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index 3dd78d51e..b6e5e6a90 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
- case Array(1, v, _) if v >= 12 && v <= 17 => true
+ case Array(1, v, _) if v >= 12 && v <= 18 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported flink version:
$version")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index c2f405343..5707e8e64 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -69,7 +69,7 @@ public class EnvInitializer implements ApplicationRunner {
private static final Pattern PATTERN_FLINK_SHIMS_JAR =
Pattern.compile(
- "^streampark-flink-shims_flink-(1.1[2-7])_(2.11|2.12)-(.*).jar$",
+ "^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
@Override
diff --git a/streampark-flink/streampark-flink-shims/pom.xml
b/streampark-flink/streampark-flink-shims/pom.xml
index 47dd29be6..b67ab2483 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -44,6 +44,7 @@
<module>streampark-flink-shims_flink-1.15</module>
<module>streampark-flink-shims_flink-1.16</module>
<module>streampark-flink-shims_flink-1.17</module>
+ <module>streampark-flink-shims_flink-1.18</module>
</modules>
</profile>
</profiles>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 721a143de..70101672e 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -53,7 +53,14 @@ object FlinkSqlValidator extends Logger {
TableConfigOptions.TABLE_SQL_DIALECT,
sqlDialect.name().toLowerCase())
val conformance = sqlDialect match {
- case HIVE => FlinkSqlConformance.HIVE
+ case HIVE =>
+ try {
+ FlinkSqlConformance.HIVE
+ } catch {
+ // for flink 1.18+
+ case _: NoSuchFieldError => FlinkSqlConformance.DEFAULT
+ case e => throw new IllegalArgumentException("Init Flink sql
Dialect error: ", e)
+ }
case DEFAULT => FlinkSqlConformance.DEFAULT
case _ => throw new UnsupportedOperationException(s"Unsupported
sqlDialect: $sqlDialect")
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
new file mode 100644
index 000000000..1c8ee265a
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-shims</artifactId>
+ <version>2.1.2</version>
+ </parent>
+
+
<artifactId>streampark-flink-shims_flink-1.18_${scala.binary.version}</artifactId>
+ <name>StreamPark : Flink Shims 1.18</name>
+
+ <properties>
+ <flink.version>1.18.0</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!--flink-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-uber</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-yarn</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<createDependencyReducedPom>true</createDependencyReducedPom>
+
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+ <artifactSet>
+ <includes>
+
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..4f6336f5a
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T])
+ extends FlinkClientTrait[T](clusterClient) {
+
+ override def triggerSavepoint(jobID: JobID, savepointDir: String):
CompletableFuture[String] = {
+ clusterClient.triggerSavepoint(jobID, savepointDir,
SavepointFormatType.DEFAULT)
+ }
+
+ override def cancelWithSavepoint(
+ jobID: JobID,
+ savepointDirectory: String): CompletableFuture[String] = {
+ clusterClient.cancelWithSavepoint(jobID, savepointDirectory,
SavepointFormatType.DEFAULT)
+ }
+
+ override def stopWithSavepoint(
+ jobID: JobID,
+ advanceToEndOfEventTime: Boolean,
+ savepointDirectory: String): CompletableFuture[String] = {
+ clusterClient.stopWithSavepoint(
+ jobID,
+ advanceToEndOfEventTime,
+ savepointDirectory,
+ SavepointFormatType.DEFAULT)
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
new file mode 100644
index 000000000..f388c8e9f
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
+
+import java.util.Optional
+
+class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
+ extends FlinkKubernetesClientTrait(kubeClient) {
+
+ override def getService(serviceName: String): Optional[KubernetesService] = {
+ kubeClient.getService(serviceName)
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..65f715c75
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat,
PlanReference, Schema, Table, TableDescriptor, TableResult}
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet,
StreamTableEnvironment}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+import java.util.{List => JList}
+
+class StreamTableContext(
+ override val parameter: ParameterTool,
+ private val streamEnv: StreamExecutionEnvironment,
+ private val tableEnv: StreamTableEnvironment)
+ extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+ def this(args: (ParameterTool, StreamExecutionEnvironment,
StreamTableEnvironment)) =
+ this(args._1, args._2, args._3)
+
+ def this(args: StreamTableEnvConfig) =
this(FlinkTableInitializer.initialize(args))
+
+ override def fromDataStream[T](dataStream: DataStream[T], schema: Schema):
Table =
+ tableEnv.fromDataStream[T](dataStream, schema)
+
+ override def fromChangelogStream(dataStream: DataStream[Row]): Table =
+ tableEnv.fromChangelogStream(dataStream)
+
+ override def fromChangelogStream(dataStream: DataStream[Row], schema:
Schema): Table =
+ tableEnv.fromChangelogStream(dataStream, schema)
+
+ override def fromChangelogStream(
+ dataStream: DataStream[Row],
+ schema: Schema,
+ changelogMode: ChangelogMode): Table =
+ tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+ override def createTemporaryView[T](
+ path: String,
+ dataStream: DataStream[T],
+ schema: Schema): Unit = tableEnv.createTemporaryView[T](path,
dataStream, schema)
+
+ override def toDataStream(table: Table): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream(table)
+ }
+
+ override def toDataStream[T](table: Table, targetClass: Class[T]):
DataStream[T] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream[T](table, targetClass)
+ }
+
+ override def toDataStream[T](table: Table, targetDataType:
AbstractDataType[_]): DataStream[T] = {
+ isConvertedToDataStream = true
+ tableEnv.toDataStream[T](table, targetDataType)
+ }
+
+ override def toChangelogStream(table: Table): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table)
+ }
+
+ override def toChangelogStream(table: Table, targetSchema: Schema):
DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table, targetSchema)
+ }
+
+ override def toChangelogStream(
+ table: Table,
+ targetSchema: Schema,
+ changelogMode: ChangelogMode): DataStream[Row] = {
+ isConvertedToDataStream = true
+ tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+ }
+
+ override def createStatementSet(): StreamStatementSet =
tableEnv.createStatementSet()
+
+ override def useModules(strings: String*): Unit =
tableEnv.useModules(strings: _*)
+
+ override def createTemporaryTable(path: String, descriptor:
TableDescriptor): Unit =
+ tableEnv.createTemporaryTable(path, descriptor)
+
+ override def createTable(path: String, descriptor: TableDescriptor): Unit =
+ tableEnv.createTable(path, descriptor)
+
+ override def from(descriptor: TableDescriptor): Table =
tableEnv.from(descriptor)
+
+ override def listFullModules(): Array[ModuleEntry] =
tableEnv.listFullModules()
+
+ /** @since 1.15 */
+ override def listTables(s: String, s1: String): Array[String] =
tableEnv.listTables(s, s1)
+
+ /** @since 1.15 */
+ override def loadPlan(planReference: PlanReference): CompiledPlan =
+ tableEnv.loadPlan(planReference)
+
+ /** @since 1.15 */
+ override def compilePlanSql(s: String): CompiledPlan =
tableEnv.compilePlanSql(s)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri],
+ ignoreIfExists: Boolean): Unit =
+ tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+ /** @since 1.17 */
+ override def createTemporaryFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createTemporarySystemFunction(
+ name: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+ /** @since 1.17 */
+ override def explainSql(
+ statement: String,
+ format: ExplainFormat,
+ extraDetails: ExplainDetail*): String =
+ tableEnv.explainSql(statement, format, extraDetails: _*)
+
+ /** @since 1.18 */
+ override def createCatalog(catalog: String, catalogDescriptor:
CatalogDescriptor): Unit = {
+ tableEnv.createCatalog(catalog, catalogDescriptor)
+ }
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..e8f704f39
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat,
PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+
+import java.util.{List => JList}
+
+class TableContext(override val parameter: ParameterTool, private val
tableEnv: TableEnvironment)
+ extends FlinkTableTrait(parameter, tableEnv) {
+
+ def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+ def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+ override def useModules(strings: String*): Unit =
tableEnv.useModules(strings: _*)
+
+ override def createTemporaryTable(path: String, descriptor:
TableDescriptor): Unit = {
+ tableEnv.createTemporaryTable(path, descriptor)
+ }
+
+ override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+ tableEnv.createTable(path, descriptor)
+ }
+
+ override def from(tableDescriptor: TableDescriptor): Table = {
+ tableEnv.from(tableDescriptor)
+ }
+
+ override def listFullModules(): Array[ModuleEntry] =
tableEnv.listFullModules()
+
+ /** @since 1.15 */
+ override def listTables(catalogName: String, databaseName: String):
Array[String] =
+ tableEnv.listTables(catalogName, databaseName)
+
+ /** @since 1.15 */
+ override def loadPlan(planReference: PlanReference): CompiledPlan =
+ tableEnv.loadPlan(planReference)
+
+ /** @since 1.15 */
+ override def compilePlanSql(stmt: String): CompiledPlan =
tableEnv.compilePlanSql(stmt)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri],
+ ignoreIfExists: Boolean): Unit =
+ tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+ /** @since 1.17 */
+ override def createTemporaryFunction(
+ path: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+ /** @since 1.17 */
+ override def createTemporarySystemFunction(
+ name: String,
+ className: String,
+ resourceUris: JList[ResourceUri]): Unit =
+ tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+ /** @since 1.17 */
+ override def explainSql(
+ statement: String,
+ format: ExplainFormat,
+ extraDetails: ExplainDetail*): String =
+ tableEnv.explainSql(statement, format, extraDetails: _*)
+
+ /** @since 1.18 */
+ override def createCatalog(catalog: String, catalogDescriptor:
CatalogDescriptor): Unit = {
+ tableEnv.createCatalog(catalog, catalogDescriptor)
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
new file mode 100644
index 000000000..cab368e36
--- /dev/null
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.table.api.bridge.scala.{TableConversions =>
FlinkTableConversions}
+import org.apache.flink.types.Row
+
+object TableExt {
+
+ class Table(val table: FlinkTable) {
+ def ->(field: String, fields: String*): FlinkTable = table.as(field,
fields: _*)
+ }
+
+ class TableConversions(table: FlinkTable) extends
FlinkTableConversions(table) {
+
+ def \\ : DataStream[Row] = toDataStream
+
+ def >>[T: TypeInformation](implicit context: StreamTableContext):
DataStream[T] = {
+ context.isConvertedToDataStream = true
+ super.toAppendStream
+ }
+ }
+
+}