This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new 3e12163d5 flink job batchMode bug fixed.
3e12163d5 is described below

commit 3e12163d55da04a774d1c877785a4c3836587606
Author: benjobs <[email protected]>
AuthorDate: Fri Aug 25 22:18:25 2023 +0800

    flink job batchMode bug fixed.
---
 .../streampark/flink/core/scala/FlinkStreamTable.scala    | 15 ++++++---------
 .../apache/streampark/flink/core/scala/FlinkTable.scala   | 14 ++++++--------
 .../apache/streampark/flink/core/FlinkTableTrait.scala    |  8 +-------
 .../org/apache/streampark/flink/core/TableContext.scala   |  2 +-
 .../org/apache/streampark/flink/core/TableContext.scala   |  2 +-
 .../org/apache/streampark/flink/core/TableContext.scala   |  2 +-
 6 files changed, 16 insertions(+), 27 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
index c7bfbd155..01da09b1b 100644
--- 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
+++ 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
@@ -22,7 +22,6 @@ import org.apache.streampark.common.util.{Logger, 
SystemPropertyUtils}
 import org.apache.streampark.flink.core.{FlinkTableInitializer, 
StreamTableContext}
 import org.apache.streampark.flink.core.TableExt
 
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.{Table, TableConfig}
@@ -40,22 +39,20 @@ trait FlinkStreamTable extends Logger {
 
   implicit var context: StreamTableContext = _
 
-  var jobExecutionResult: JobExecutionResult = _
+  private[this] def init(args: Array[String]): Unit = {
+    SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreamTable])
+    context = new StreamTableContext(
+      FlinkTableInitializer.initialize(args, configStream, configTable))
+  }
 
   def main(args: Array[String]): Unit = {
     init(args)
     ready()
     handle()
-    jobExecutionResult = context.start()
+    context.start()
     destroy()
   }
 
-  private[this] def init(args: Array[String]): Unit = {
-    SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreamTable])
-    context = new StreamTableContext(
-      FlinkTableInitializer.initialize(args, configStream, configTable))
-  }
-
   def configStream(env: StreamExecutionEnvironment, parameter: ParameterTool): 
Unit = {}
 
   def configTable(tableConfig: TableConfig, parameter: ParameterTool): Unit = 
{}
diff --git 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
index 78ead4fb2..2df1c6443 100644
--- 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
+++ 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
@@ -29,25 +29,23 @@ import scala.language.implicitConversions
 
 trait FlinkTable extends Logger {
 
-  var jobExecutionResult: JobExecutionResult = _
-
   implicit final lazy val parameter: ParameterTool = context.parameter
 
   implicit var context: TableContext = _
 
+  private[this] def init(args: Array[String]): Unit = {
+    SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkTable])
+    context = new TableContext(FlinkTableInitializer.initialize(args, config))
+  }
+
   def main(args: Array[String]): Unit = {
     init(args)
     ready()
     handle()
-    jobExecutionResult = context.start()
+    context.start()
     destroy()
   }
 
-  private[this] def init(args: Array[String]): Unit = {
-    SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkTable])
-    context = new TableContext(FlinkTableInitializer.initialize(args, config))
-  }
-
   def ready(): Unit = {}
 
   def config(tableConfig: TableConfig, parameter: ParameterTool): Unit = {}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 1d0873341..659d974a3 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -19,7 +19,6 @@ package org.apache.streampark.flink.core
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.flink.core.EnhancerImplicit._
 
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.table.api._
 import org.apache.flink.table.catalog.Catalog
@@ -34,16 +33,11 @@ import java.util.Optional
 abstract class FlinkTableTrait(val parameter: ParameterTool, private val 
tableEnv: TableEnvironment)
   extends TableEnvironment {
 
-  def start(): JobExecutionResult = {
+  def start(): Unit = {
     val appName = parameter.getAppName(required = true)
     execute(appName)
   }
 
-  def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
-    null
-  }
-
   def sql(sql: String = null): Unit = FlinkSqlExecutor.executeSql(sql, 
parameter, this)
 
   private[flink] def sqlWithCallBack(sql: String = null)(implicit
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index c512a24ea..ae4edadad 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -47,7 +47,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
 
   override def execute(jobName: String): JobExecutionResult = {
     printLogo(s"FlinkTable $jobName Starting...")
-    tableEnv.execute(jobName)
+    null
   }
 
   @deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index e9c950815..f23e4c9c5 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -52,7 +52,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
 
   override def execute(jobName: String): JobExecutionResult = {
     printLogo(s"FlinkTable $jobName Starting...")
-    tableEnv.execute(jobName)
+    null
   }
 
   @deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 7e4daf539..abbe13822 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -65,7 +65,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
 
   @deprecated override def execute(jobName: String): JobExecutionResult = {
     printLogo(s"FlinkTable $jobName Starting...")
-    tableEnv.execute(jobName)
+    null
   }
 
   @deprecated override def fromTableSource(source: TableSource[_]): Table =

Reply via email to