This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new abec8c849 [Feature] dev-2.1.2 Support Flink 1.18 (#3279)
abec8c849 is described below

commit abec8c849522aa12283ad943079713576f913690
Author: ChengJie1053 <[email protected]>
AuthorDate: Wed Oct 25 11:11:52 2023 +0800

    [Feature] dev-2.1.2 Support Flink 1.18 (#3279)
---
 .../streampark/common/conf/FlinkVersion.scala      |   2 +-
 .../console/core/runner/EnvInitializer.java        |   2 +-
 streampark-flink/streampark-flink-shims/pom.xml    |   1 +
 .../streampark/flink/core/FlinkSqlValidator.scala  |   9 +-
 .../streampark-flink-shims_flink-1.18/pom.xml      | 154 ++++++++++++++++++++
 .../streampark/flink/core/FlinkClusterClient.scala |  49 +++++++
 .../flink/core/FlinkKubernetesClient.scala         |  31 ++++
 .../streampark/flink/core/StreamTableContext.scala | 161 +++++++++++++++++++++
 .../streampark/flink/core/TableContext.scala       | 103 +++++++++++++
 .../apache/streampark/flink/core/TableExt.scala    |  42 ++++++
 10 files changed, 551 insertions(+), 3 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index 3dd78d51e..b6e5e6a90 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
 
   def checkVersion(throwException: Boolean = true): Boolean = {
     version.split("\\.").map(_.trim.toInt) match {
-      case Array(1, v, _) if v >= 12 && v <= 17 => true
+      case Array(1, v, _) if v >= 12 && v <= 18 => true
       case _ =>
         if (throwException) {
           throw new UnsupportedOperationException(s"Unsupported flink version: 
$version")
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index c2f405343..5707e8e64 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -69,7 +69,7 @@ public class EnvInitializer implements ApplicationRunner {
 
   private static final Pattern PATTERN_FLINK_SHIMS_JAR =
       Pattern.compile(
-          "^streampark-flink-shims_flink-(1.1[2-7])_(2.11|2.12)-(.*).jar$",
+          "^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$",
           Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
 
   @Override
diff --git a/streampark-flink/streampark-flink-shims/pom.xml 
b/streampark-flink/streampark-flink-shims/pom.xml
index 47dd29be6..b67ab2483 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -44,6 +44,7 @@
                 <module>streampark-flink-shims_flink-1.15</module>
                 <module>streampark-flink-shims_flink-1.16</module>
                 <module>streampark-flink-shims_flink-1.17</module>
+                <module>streampark-flink-shims_flink-1.18</module>
             </modules>
         </profile>
     </profiles>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 721a143de..70101672e 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -53,7 +53,14 @@ object FlinkSqlValidator extends Logger {
         TableConfigOptions.TABLE_SQL_DIALECT,
         sqlDialect.name().toLowerCase())
       val conformance = sqlDialect match {
-        case HIVE => FlinkSqlConformance.HIVE
+        case HIVE =>
+          try {
+            FlinkSqlConformance.HIVE
+          } catch {
+            // for flink 1.18+
+            case _: NoSuchFieldError => FlinkSqlConformance.DEFAULT
+            case e => throw new IllegalArgumentException("Init Flink sql 
Dialect error: ", e)
+          }
         case DEFAULT => FlinkSqlConformance.DEFAULT
         case _ => throw new UnsupportedOperationException(s"Unsupported 
sqlDialect: $sqlDialect")
       }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
new file mode 100644
index 000000000..1c8ee265a
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink-shims</artifactId>
+        <version>2.1.2</version>
+    </parent>
+
+    
<artifactId>streampark-flink-shims_flink-1.18_${scala.binary.version}</artifactId>
+    <name>StreamPark : Flink Shims 1.18</name>
+
+    <properties>
+        <flink.version>1.18.0</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!--flink-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-uber</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-yarn</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<createDependencyReducedPom>true</createDependencyReducedPom>
+                            
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                            <artifactSet>
+                                <includes>
+                                    
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..4f6336f5a
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T])
+  extends FlinkClientTrait[T](clusterClient) {
+
+  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
+  }
+
+  override def cancelWithSavepoint(
+      jobID: JobID,
+      savepointDirectory: String): CompletableFuture[String] = {
+    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
+  }
+
+  override def stopWithSavepoint(
+      jobID: JobID,
+      advanceToEndOfEventTime: Boolean,
+      savepointDirectory: String): CompletableFuture[String] = {
+    clusterClient.stopWithSavepoint(
+      jobID,
+      advanceToEndOfEventTime,
+      savepointDirectory,
+      SavepointFormatType.DEFAULT)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
new file mode 100644
index 000000000..f388c8e9f
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
+
+import java.util.Optional
+
+class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
+  extends FlinkKubernetesClientTrait(kubeClient) {
+
+  override def getService(serviceName: String): Optional[KubernetesService] = {
+    kubeClient.getService(serviceName)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..65f715c75
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, 
PlanReference, Schema, Table, TableDescriptor, TableResult}
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, 
StreamTableEnvironment}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+import java.util.{List => JList}
+
+class StreamTableContext(
+    override val parameter: ParameterTool,
+    private val streamEnv: StreamExecutionEnvironment,
+    private val tableEnv: StreamTableEnvironment)
+  extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+  def this(args: (ParameterTool, StreamExecutionEnvironment, 
StreamTableEnvironment)) =
+    this(args._1, args._2, args._3)
+
+  def this(args: StreamTableEnvConfig) = 
this(FlinkTableInitializer.initialize(args))
+
+  override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): 
Table =
+    tableEnv.fromDataStream[T](dataStream, schema)
+
+  override def fromChangelogStream(dataStream: DataStream[Row]): Table =
+    tableEnv.fromChangelogStream(dataStream)
+
+  override def fromChangelogStream(dataStream: DataStream[Row], schema: 
Schema): Table =
+    tableEnv.fromChangelogStream(dataStream, schema)
+
+  override def fromChangelogStream(
+      dataStream: DataStream[Row],
+      schema: Schema,
+      changelogMode: ChangelogMode): Table =
+    tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+  override def createTemporaryView[T](
+      path: String,
+      dataStream: DataStream[T],
+      schema: Schema): Unit = tableEnv.createTemporaryView[T](path, 
dataStream, schema)
+
+  override def toDataStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream(table)
+  }
+
+  override def toDataStream[T](table: Table, targetClass: Class[T]): 
DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetClass)
+  }
+
+  override def toDataStream[T](table: Table, targetDataType: 
AbstractDataType[_]): DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetDataType)
+  }
+
+  override def toChangelogStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table)
+  }
+
+  override def toChangelogStream(table: Table, targetSchema: Schema): 
DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema)
+  }
+
+  override def toChangelogStream(
+      table: Table,
+      targetSchema: Schema,
+      changelogMode: ChangelogMode): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+  }
+
+  override def createStatementSet(): StreamStatementSet = 
tableEnv.createStatementSet()
+
+  override def useModules(strings: String*): Unit = 
tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: 
TableDescriptor): Unit =
+    tableEnv.createTemporaryTable(path, descriptor)
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit =
+    tableEnv.createTable(path, descriptor)
+
+  override def from(descriptor: TableDescriptor): Table = 
tableEnv.from(descriptor)
+
+  override def listFullModules(): Array[ModuleEntry] = 
tableEnv.listFullModules()
+
+  /** @since 1.15 */
+  override def listTables(s: String, s1: String): Array[String] = 
tableEnv.listTables(s, s1)
+
+  /** @since 1.15 */
+  override def loadPlan(planReference: PlanReference): CompiledPlan =
+    tableEnv.loadPlan(planReference)
+
+  /** @since 1.15 */
+  override def compilePlanSql(s: String): CompiledPlan = 
tableEnv.compilePlanSql(s)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri],
+      ignoreIfExists: Boolean): Unit =
+    tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+  /** @since 1.17 */
+  override def createTemporaryFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createTemporarySystemFunction(
+      name: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+  /** @since 1.17 */
+  override def explainSql(
+      statement: String,
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String =
+    tableEnv.explainSql(statement, format, extraDetails: _*)
+
+  /** @since 1.18 */
+  override def createCatalog(catalog: String, catalogDescriptor: 
CatalogDescriptor): Unit = {
+    tableEnv.createCatalog(catalog, catalogDescriptor)
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..e8f704f39
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, 
PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+
+import java.util.{List => JList}
+
+class TableContext(override val parameter: ParameterTool, private val 
tableEnv: TableEnvironment)
+  extends FlinkTableTrait(parameter, tableEnv) {
+
+  def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+  override def useModules(strings: String*): Unit = 
tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: 
TableDescriptor): Unit = {
+    tableEnv.createTemporaryTable(path, descriptor)
+  }
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+    tableEnv.createTable(path, descriptor)
+  }
+
+  override def from(tableDescriptor: TableDescriptor): Table = {
+    tableEnv.from(tableDescriptor)
+  }
+
+  override def listFullModules(): Array[ModuleEntry] = 
tableEnv.listFullModules()
+
+  /** @since 1.15 */
+  override def listTables(catalogName: String, databaseName: String): 
Array[String] =
+    tableEnv.listTables(catalogName, databaseName)
+
+  /** @since 1.15 */
+  override def loadPlan(planReference: PlanReference): CompiledPlan =
+    tableEnv.loadPlan(planReference)
+
+  /** @since 1.15 */
+  override def compilePlanSql(stmt: String): CompiledPlan = 
tableEnv.compilePlanSql(stmt)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri],
+      ignoreIfExists: Boolean): Unit =
+    tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+  /** @since 1.17 */
+  override def createTemporaryFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createTemporarySystemFunction(
+      name: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+  /** @since 1.17 */
+  override def explainSql(
+      statement: String,
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String =
+    tableEnv.explainSql(statement, format, extraDetails: _*)
+
+  /** @since 1.18 */
+  override def createCatalog(catalog: String, catalogDescriptor: 
CatalogDescriptor): Unit = {
+    tableEnv.createCatalog(catalog, catalogDescriptor)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
new file mode 100644
index 000000000..cab368e36
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.table.api.bridge.scala.{TableConversions => 
FlinkTableConversions}
+import org.apache.flink.types.Row
+
+object TableExt {
+
+  class Table(val table: FlinkTable) {
+    def ->(field: String, fields: String*): FlinkTable = table.as(field, 
fields: _*)
+  }
+
+  class TableConversions(table: FlinkTable) extends 
FlinkTableConversions(table) {
+
+    def \\ : DataStream[Row] = toDataStream
+
+    def >>[T: TypeInformation](implicit context: StreamTableContext): 
DataStream[T] = {
+      context.isConvertedToDataStream = true
+      super.toAppendStream
+    }
+  }
+
+}

Reply via email to