This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new 0eaf7de63 [Improve] flink batch job execute method improvement
0eaf7de63 is described below
commit 0eaf7de63c17eefc7fa035c0f40c36c65265bb51
Author: benjobs <[email protected]>
AuthorDate: Mon Sep 4 21:59:40 2023 +0800
[Improve] flink batch job execute method improvement
---
.../scala/org/apache/streampark/flink/core/FlinkTableTrait.scala | 8 +++++++-
.../scala/org/apache/streampark/flink/core/TableContext.scala | 8 --------
.../scala/org/apache/streampark/flink/core/TableContext.scala | 8 --------
.../scala/org/apache/streampark/flink/core/TableContext.scala | 8 --------
.../scala/org/apache/streampark/flink/core/TableContext.scala | 8 --------
5 files changed, 7 insertions(+), 33 deletions(-)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 659d974a3..755c40103 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -16,9 +16,10 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigConst.printLogo
import org.apache.streampark.flink.core.EnhancerImplicit._
+import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api._
import org.apache.flink.table.catalog.Catalog
@@ -38,6 +39,11 @@ abstract class FlinkTableTrait(val parameter: ParameterTool,
private val tableEn
execute(appName)
}
+ @deprecated override def execute(jobName: String): JobExecutionResult = {
+ printLogo(s"Flink Table batch job: $jobName Starting...")
+ null
+ }
+
def sql(sql: String = null): Unit = FlinkSqlExecutor.executeSql(sql,
parameter, this)
private[flink] def sqlWithCallBack(sql: String = null)(implicit
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index ae4edadad..5d099522a 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,9 +16,6 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.printLogo
-
-import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.descriptors.{ConnectorDescriptor,
ConnectTableDescriptor}
@@ -45,11 +42,6 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor =
tableEnv.connect(connectorDescriptor)
- override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
- null
- }
-
@deprecated override def fromTableSource(source: TableSource[_]): Table =
tableEnv.fromTableSource(source)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index f23e4c9c5..69990991d 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,9 +16,6 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.printLogo
-
-import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.descriptors.{ConnectorDescriptor,
ConnectTableDescriptor}
@@ -50,11 +47,6 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor =
tableEnv.connect(connectorDescriptor)
- override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
- null
- }
-
@deprecated override def fromTableSource(source: TableSource[_]): Table =
tableEnv.fromTableSource(source)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index abbe13822..92e61a70b 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,9 +16,6 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.printLogo
-
-import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api.{Table, TableDescriptor, TableEnvironment}
import org.apache.flink.table.module.ModuleEntry
@@ -63,11 +60,6 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor)
- @deprecated override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
- null
- }
-
@deprecated override def fromTableSource(source: TableSource[_]): Table =
tableEnv.fromTableSource(source)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 64337d26b..5acf1ba51 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -17,9 +17,6 @@
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.printLogo
-
-import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api.{CompiledPlan, PlanReference, Table,
TableDescriptor, TableEnvironment}
import org.apache.flink.table.module.ModuleEntry
@@ -45,11 +42,6 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
override def listFullModules(): Array[ModuleEntry] =
tableEnv.listFullModules()
- override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
- null
- }
-
/** @since 1.15 */
override def listTables(catalogName: String, databaseName: String):
Array[String] =
tableEnv.listTables(catalogName, databaseName)