This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new 0eaf7de63 [Improve] flink batch job execute method improvement
0eaf7de63 is described below

commit 0eaf7de63c17eefc7fa035c0f40c36c65265bb51
Author: benjobs <[email protected]>
AuthorDate: Mon Sep 4 21:59:40 2023 +0800

    [Improve] flink batch job execute method improvement
---
 .../scala/org/apache/streampark/flink/core/FlinkTableTrait.scala  | 8 +++++++-
 .../scala/org/apache/streampark/flink/core/TableContext.scala     | 8 --------
 .../scala/org/apache/streampark/flink/core/TableContext.scala     | 8 --------
 .../scala/org/apache/streampark/flink/core/TableContext.scala     | 8 --------
 .../scala/org/apache/streampark/flink/core/TableContext.scala     | 8 --------
 5 files changed, 7 insertions(+), 33 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 659d974a3..755c40103 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -16,9 +16,10 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigConst.printLogo
 import org.apache.streampark.flink.core.EnhancerImplicit._
 
+import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api._
 import org.apache.flink.table.catalog.Catalog
@@ -38,6 +39,11 @@ abstract class FlinkTableTrait(val parameter: ParameterTool, 
private val tableEn
     execute(appName)
   }
 
+  @deprecated override def execute(jobName: String): JobExecutionResult = {
+    printLogo(s"Flink Table batch job: $jobName Starting...")
+    null
+  }
+
   def sql(sql: String = null): Unit = FlinkSqlExecutor.executeSql(sql, 
parameter, this)
 
   private[flink] def sqlWithCallBack(sql: String = null)(implicit
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index ae4edadad..5d099522a 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,9 +16,6 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.printLogo
-
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api.{Table, TableEnvironment}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
ConnectTableDescriptor}
@@ -45,11 +42,6 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
       connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor =
     tableEnv.connect(connectorDescriptor)
 
-  override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
-    null
-  }
-
   @deprecated override def fromTableSource(source: TableSource[_]): Table =
     tableEnv.fromTableSource(source)
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index f23e4c9c5..69990991d 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,9 +16,6 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.printLogo
-
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api.{Table, TableEnvironment}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
ConnectTableDescriptor}
@@ -50,11 +47,6 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
       connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor =
     tableEnv.connect(connectorDescriptor)
 
-  override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
-    null
-  }
-
   @deprecated override def fromTableSource(source: TableSource[_]): Table =
     tableEnv.fromTableSource(source)
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index abbe13822..92e61a70b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,9 +16,6 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.printLogo
-
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api.{Table, TableDescriptor, TableEnvironment}
 import org.apache.flink.table.module.ModuleEntry
@@ -63,11 +60,6 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
 
   def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor)
 
-  @deprecated override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
-    null
-  }
-
   @deprecated override def fromTableSource(source: TableSource[_]): Table =
     tableEnv.fromTableSource(source)
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 64337d26b..5acf1ba51 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -17,9 +17,6 @@
 
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.printLogo
-
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api.{CompiledPlan, PlanReference, Table, 
TableDescriptor, TableEnvironment}
 import org.apache.flink.table.module.ModuleEntry
@@ -45,11 +42,6 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
 
   override def listFullModules(): Array[ModuleEntry] = 
tableEnv.listFullModules()
 
-  override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
-    null
-  }
-
   /** @since 1.15 */
   override def listTables(catalogName: String, databaseName: String): 
Array[String] =
     tableEnv.listTables(catalogName, databaseName)

Reply via email to