This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch flink-119
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit a39950c2ed2734c944a6686de16dbf2bb6d58003
Author: benjobs <[email protected]>
AuthorDate: Fri Apr 19 15:17:34 2024 +0800

    [Improve] apache flink 1.19 support
---
 .../streampark/common/conf/FlinkVersion.scala      |   2 +-
 .../streampark-console-service/pom.xml             |   7 +
 .../streampark/console/core/entity/FlinkEnv.java   |  26 +++-
 .../console/core/runner/EnvInitializer.java        |   2 +-
 .../core/service/impl/FlinkEnvServiceImpl.java     |   2 +-
 streampark-flink/streampark-flink-shims/pom.xml    |   1 +
 .../streampark-flink-shims_flink-1.19/pom.xml      | 154 ++++++++++++++++++++
 .../streampark/flink/core/FlinkClusterClient.scala |  49 +++++++
 .../flink/core/FlinkKubernetesClient.scala         |  31 ++++
 .../streampark/flink/core/StreamTableContext.scala | 161 +++++++++++++++++++++
 .../streampark/flink/core/TableContext.scala       | 103 +++++++++++++
 .../apache/streampark/flink/core/TableExt.scala    |  42 ++++++
 12 files changed, 575 insertions(+), 5 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index b6e5e6a90..6dec3b29b 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
 
   def checkVersion(throwException: Boolean = true): Boolean = {
     version.split("\\.").map(_.trim.toInt) match {
-      case Array(1, v, _) if v >= 12 && v <= 18 => true
+      case Array(1, v, _) if v >= 12 && v <= 19 => true
       case _ =>
         if (throwException) {
           throw new UnsupportedOperationException(s"Unsupported flink version: 
$version")
diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index 2d7a7a520..aa85f698f 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -618,6 +618,13 @@
                                     <version>${project.version}</version>
                                     
<outputDirectory>${project.build.directory}/shims</outputDirectory>
                                 </dependency>
+                                <!-- flink 1.19 support-->
+                                <dependency>
+                                    <groupId>org.apache.streampark</groupId>
+                                    
<artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId>
+                                    <version>${project.version}</version>
+                                    
<outputDirectory>${project.build.directory}/shims</outputDirectory>
+                                </dependency>
                                 <!-- flink-submit-core -->
                                 <dependency>
                                     <groupId>org.apache.streampark</groupId>
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index 6decd5de5..b1ed53da8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity;
 import org.apache.streampark.common.conf.FlinkVersion;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
 import org.apache.commons.io.FileUtils;
@@ -33,6 +34,7 @@ import lombok.Setter;
 
 import java.io.File;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.Date;
 import java.util.Map;
 import java.util.Properties;
@@ -67,9 +69,29 @@ public class FlinkEnv implements Serializable {
   private transient String streamParkScalaVersion = 
scala.util.Properties.versionNumberString();
 
   public void doSetFlinkConf() throws ApiDetailException {
+    File yaml;
+    float ver = 
Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
+    if (ver < 1.19f) {
+      yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf 
");
+      }
+    } else if (ver == 1.19f) {
+      yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+      if (!yaml.exists()) {
+        yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+      }
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml 
in flink/conf ");
+      }
+    } else {
+      yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find config.yaml in flink/conf ");
+      }
+    }
     try {
-      File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
-      String flinkConf = FileUtils.readFileToString(yaml);
+      String flinkConf = FileUtils.readFileToString(yaml, 
StandardCharsets.UTF_8);
       this.flinkConf = DeflaterUtils.zipString(flinkConf);
     } catch (Exception e) {
       throw new ApiDetailException(e);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 841f954fa..0e87a9361 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -68,7 +68,7 @@ public class EnvInitializer implements ApplicationRunner {
 
   private static final Pattern PATTERN_FLINK_SHIMS_JAR =
       Pattern.compile(
-          "^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$",
+          "^streampark-flink-shims_flink-(1.1[2-9])_(2.11|2.12)-(.*).jar$",
           Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
 
   @Override
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index e83bd65bb..5a53e587e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -84,8 +84,8 @@ public class FlinkEnvServiceImpl extends 
ServiceImpl<FlinkEnvMapper, FlinkEnv>
     long count = this.baseMapper.selectCount(null);
     version.setIsDefault(count == 0);
     version.setCreateTime(new Date());
-    version.doSetFlinkConf();
     version.doSetVersion();
+    version.doSetFlinkConf();
     return save(version);
   }
 
diff --git a/streampark-flink/streampark-flink-shims/pom.xml 
b/streampark-flink/streampark-flink-shims/pom.xml
index 587c5bd4e..1b53bef49 100644
--- a/streampark-flink/streampark-flink-shims/pom.xml
+++ b/streampark-flink/streampark-flink-shims/pom.xml
@@ -45,6 +45,7 @@
                 <module>streampark-flink-shims_flink-1.16</module>
                 <module>streampark-flink-shims_flink-1.17</module>
                 <module>streampark-flink-shims_flink-1.18</module>
+                <module>streampark-flink-shims_flink-1.19</module>
             </modules>
         </profile>
     </profiles>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
new file mode 100644
index 000000000..2e0751ecb
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink-shims</artifactId>
+        <version>2.1.4</version>
+    </parent>
+
+    
<artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId>
+    <name>StreamPark : Flink Shims 1.19</name>
+
+    <properties>
+        <flink.version>1.19.0</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!--flink-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-uber</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-yarn</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-api</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<createDependencyReducedPom>true</createDependencyReducedPom>
+                            
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                            <artifactSet>
+                                <includes>
+                                    
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
new file mode 100644
index 000000000..4f6336f5a
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.core.execution.SavepointFormatType
+
+import java.util.concurrent.CompletableFuture
+
+class FlinkClusterClient[T](clusterClient: ClusterClient[T])
+  extends FlinkClientTrait[T](clusterClient) {
+
+  override def triggerSavepoint(jobID: JobID, savepointDir: String): 
CompletableFuture[String] = {
+    clusterClient.triggerSavepoint(jobID, savepointDir, 
SavepointFormatType.DEFAULT)
+  }
+
+  override def cancelWithSavepoint(
+      jobID: JobID,
+      savepointDirectory: String): CompletableFuture[String] = {
+    clusterClient.cancelWithSavepoint(jobID, savepointDirectory, 
SavepointFormatType.DEFAULT)
+  }
+
+  override def stopWithSavepoint(
+      jobID: JobID,
+      advanceToEndOfEventTime: Boolean,
+      savepointDirectory: String): CompletableFuture[String] = {
+    clusterClient.stopWithSavepoint(
+      jobID,
+      advanceToEndOfEventTime,
+      savepointDirectory,
+      SavepointFormatType.DEFAULT)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
new file mode 100644
index 000000000..f388c8e9f
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
+
+import java.util.Optional
+
+class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
+  extends FlinkKubernetesClientTrait(kubeClient) {
+
+  override def getService(serviceName: String): Optional[KubernetesService] = {
+    kubeClient.getService(serviceName)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
new file mode 100644
index 000000000..65f715c75
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, 
PlanReference, Schema, Table, TableDescriptor, TableResult}
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, 
StreamTableEnvironment}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
+
+import java.util.{List => JList}
+
+class StreamTableContext(
+    override val parameter: ParameterTool,
+    private val streamEnv: StreamExecutionEnvironment,
+    private val tableEnv: StreamTableEnvironment)
+  extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) {
+
+  def this(args: (ParameterTool, StreamExecutionEnvironment, 
StreamTableEnvironment)) =
+    this(args._1, args._2, args._3)
+
+  def this(args: StreamTableEnvConfig) = 
this(FlinkTableInitializer.initialize(args))
+
+  override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): 
Table =
+    tableEnv.fromDataStream[T](dataStream, schema)
+
+  override def fromChangelogStream(dataStream: DataStream[Row]): Table =
+    tableEnv.fromChangelogStream(dataStream)
+
+  override def fromChangelogStream(dataStream: DataStream[Row], schema: 
Schema): Table =
+    tableEnv.fromChangelogStream(dataStream, schema)
+
+  override def fromChangelogStream(
+      dataStream: DataStream[Row],
+      schema: Schema,
+      changelogMode: ChangelogMode): Table =
+    tableEnv.fromChangelogStream(dataStream, schema, changelogMode)
+
+  override def createTemporaryView[T](
+      path: String,
+      dataStream: DataStream[T],
+      schema: Schema): Unit = tableEnv.createTemporaryView[T](path, 
dataStream, schema)
+
+  override def toDataStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream(table)
+  }
+
+  override def toDataStream[T](table: Table, targetClass: Class[T]): 
DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetClass)
+  }
+
+  override def toDataStream[T](table: Table, targetDataType: 
AbstractDataType[_]): DataStream[T] = {
+    isConvertedToDataStream = true
+    tableEnv.toDataStream[T](table, targetDataType)
+  }
+
+  override def toChangelogStream(table: Table): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table)
+  }
+
+  override def toChangelogStream(table: Table, targetSchema: Schema): 
DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema)
+  }
+
+  override def toChangelogStream(
+      table: Table,
+      targetSchema: Schema,
+      changelogMode: ChangelogMode): DataStream[Row] = {
+    isConvertedToDataStream = true
+    tableEnv.toChangelogStream(table, targetSchema, changelogMode)
+  }
+
+  override def createStatementSet(): StreamStatementSet = 
tableEnv.createStatementSet()
+
+  override def useModules(strings: String*): Unit = 
tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: 
TableDescriptor): Unit =
+    tableEnv.createTemporaryTable(path, descriptor)
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit =
+    tableEnv.createTable(path, descriptor)
+
+  override def from(descriptor: TableDescriptor): Table = 
tableEnv.from(descriptor)
+
+  override def listFullModules(): Array[ModuleEntry] = 
tableEnv.listFullModules()
+
+  /** @since 1.15 */
+  override def listTables(s: String, s1: String): Array[String] = 
tableEnv.listTables(s, s1)
+
+  /** @since 1.15 */
+  override def loadPlan(planReference: PlanReference): CompiledPlan =
+    tableEnv.loadPlan(planReference)
+
+  /** @since 1.15 */
+  override def compilePlanSql(s: String): CompiledPlan = 
tableEnv.compilePlanSql(s)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri],
+      ignoreIfExists: Boolean): Unit =
+    tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+  /** @since 1.17 */
+  override def createTemporaryFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createTemporarySystemFunction(
+      name: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+  /** @since 1.17 */
+  override def explainSql(
+      statement: String,
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String =
+    tableEnv.explainSql(statement, format, extraDetails: _*)
+
+  /** @since 1.18 */
+  override def createCatalog(catalog: String, catalogDescriptor: 
CatalogDescriptor): Unit = {
+    tableEnv.createCatalog(catalog, catalogDescriptor)
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
new file mode 100644
index 000000000..e8f704f39
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, 
PlanReference, Table, TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.catalog.CatalogDescriptor
+import org.apache.flink.table.module.ModuleEntry
+import org.apache.flink.table.resource.ResourceUri
+
+import java.util.{List => JList}
+
+class TableContext(override val parameter: ParameterTool, private val 
tableEnv: TableEnvironment)
+  extends FlinkTableTrait(parameter, tableEnv) {
+
+  def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
+
+  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
+
+  override def useModules(strings: String*): Unit = 
tableEnv.useModules(strings: _*)
+
+  override def createTemporaryTable(path: String, descriptor: 
TableDescriptor): Unit = {
+    tableEnv.createTemporaryTable(path, descriptor)
+  }
+
+  override def createTable(path: String, descriptor: TableDescriptor): Unit = {
+    tableEnv.createTable(path, descriptor)
+  }
+
+  override def from(tableDescriptor: TableDescriptor): Table = {
+    tableEnv.from(tableDescriptor)
+  }
+
+  override def listFullModules(): Array[ModuleEntry] = 
tableEnv.listFullModules()
+
+  /** @since 1.15 */
+  override def listTables(catalogName: String, databaseName: String): 
Array[String] =
+    tableEnv.listTables(catalogName, databaseName)
+
+  /** @since 1.15 */
+  override def loadPlan(planReference: PlanReference): CompiledPlan =
+    tableEnv.loadPlan(planReference)
+
+  /** @since 1.15 */
+  override def compilePlanSql(stmt: String): CompiledPlan = 
tableEnv.compilePlanSql(stmt)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri],
+      ignoreIfExists: Boolean): Unit =
+    tableEnv.createFunction(path, className, resourceUris, ignoreIfExists)
+
+  /** @since 1.17 */
+  override def createTemporaryFunction(
+      path: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporaryFunction(path, className, resourceUris)
+
+  /** @since 1.17 */
+  override def createTemporarySystemFunction(
+      name: String,
+      className: String,
+      resourceUris: JList[ResourceUri]): Unit =
+    tableEnv.createTemporarySystemFunction(name, className, resourceUris)
+
+  /** @since 1.17 */
+  override def explainSql(
+      statement: String,
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String =
+    tableEnv.explainSql(statement, format, extraDetails: _*)
+
+  /** @since 1.18 */
+  override def createCatalog(catalog: String, catalogDescriptor: 
CatalogDescriptor): Unit = {
+    tableEnv.createCatalog(catalog, catalogDescriptor)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
new file mode 100644
index 000000000..cab368e36
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.core
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table => FlinkTable}
+import org.apache.flink.table.api.bridge.scala.{TableConversions => 
FlinkTableConversions}
+import org.apache.flink.types.Row
+
+object TableExt {
+
+  class Table(val table: FlinkTable) {
+    def ->(field: String, fields: String*): FlinkTable = table.as(field, 
fields: _*)
+  }
+
+  class TableConversions(table: FlinkTable) extends 
FlinkTableConversions(table) {
+
+    def \\ : DataStream[Row] = toDataStream
+
+    def >>[T: TypeInformation](implicit context: StreamTableContext): 
DataStream[T] = {
+      context.isConvertedToDataStream = true
+      super.toAppendStream
+    }
+  }
+
+}

Reply via email to