This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new 3e12163d5 flink job batchMode bug fixed.
3e12163d5 is described below
commit 3e12163d55da04a774d1c877785a4c3836587606
Author: benjobs <[email protected]>
AuthorDate: Fri Aug 25 22:18:25 2023 +0800
flink job batchMode bug fixed.
---
.../streampark/flink/core/scala/FlinkStreamTable.scala | 15 ++++++---------
.../apache/streampark/flink/core/scala/FlinkTable.scala | 14 ++++++--------
.../apache/streampark/flink/core/FlinkTableTrait.scala | 8 +-------
.../org/apache/streampark/flink/core/TableContext.scala | 2 +-
.../org/apache/streampark/flink/core/TableContext.scala | 2 +-
.../org/apache/streampark/flink/core/TableContext.scala | 2 +-
6 files changed, 16 insertions(+), 27 deletions(-)
diff --git
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
index c7bfbd155..01da09b1b 100644
---
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
+++
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
@@ -22,7 +22,6 @@ import org.apache.streampark.common.util.{Logger,
SystemPropertyUtils}
import org.apache.streampark.flink.core.{FlinkTableInitializer,
StreamTableContext}
import org.apache.streampark.flink.core.TableExt
-import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{Table, TableConfig}
@@ -40,22 +39,20 @@ trait FlinkStreamTable extends Logger {
implicit var context: StreamTableContext = _
- var jobExecutionResult: JobExecutionResult = _
+ private[this] def init(args: Array[String]): Unit = {
+ SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreamTable])
+ context = new StreamTableContext(
+ FlinkTableInitializer.initialize(args, configStream, configTable))
+ }
def main(args: Array[String]): Unit = {
init(args)
ready()
handle()
- jobExecutionResult = context.start()
+ context.start()
destroy()
}
- private[this] def init(args: Array[String]): Unit = {
- SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreamTable])
- context = new StreamTableContext(
- FlinkTableInitializer.initialize(args, configStream, configTable))
- }
-
def configStream(env: StreamExecutionEnvironment, parameter: ParameterTool):
Unit = {}
def configTable(tableConfig: TableConfig, parameter: ParameterTool): Unit =
{}
diff --git
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
index 78ead4fb2..2df1c6443 100644
---
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
+++
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
@@ -29,25 +29,23 @@ import scala.language.implicitConversions
trait FlinkTable extends Logger {
- var jobExecutionResult: JobExecutionResult = _
-
implicit final lazy val parameter: ParameterTool = context.parameter
implicit var context: TableContext = _
+ private[this] def init(args: Array[String]): Unit = {
+ SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkTable])
+ context = new TableContext(FlinkTableInitializer.initialize(args, config))
+ }
+
def main(args: Array[String]): Unit = {
init(args)
ready()
handle()
- jobExecutionResult = context.start()
+ context.start()
destroy()
}
- private[this] def init(args: Array[String]): Unit = {
- SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkTable])
- context = new TableContext(FlinkTableInitializer.initialize(args, config))
- }
-
def ready(): Unit = {}
def config(tableConfig: TableConfig, parameter: ParameterTool): Unit = {}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 1d0873341..659d974a3 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -19,7 +19,6 @@ package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.flink.core.EnhancerImplicit._
-import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api._
import org.apache.flink.table.catalog.Catalog
@@ -34,16 +33,11 @@ import java.util.Optional
abstract class FlinkTableTrait(val parameter: ParameterTool, private val
tableEnv: TableEnvironment)
extends TableEnvironment {
- def start(): JobExecutionResult = {
+ def start(): Unit = {
val appName = parameter.getAppName(required = true)
execute(appName)
}
- def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
- null
- }
-
def sql(sql: String = null): Unit = FlinkSqlExecutor.executeSql(sql,
parameter, this)
private[flink] def sqlWithCallBack(sql: String = null)(implicit
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index c512a24ea..ae4edadad 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -47,7 +47,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
override def execute(jobName: String): JobExecutionResult = {
printLogo(s"FlinkTable $jobName Starting...")
- tableEnv.execute(jobName)
+ null
}
@deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index e9c950815..f23e4c9c5 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -52,7 +52,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
override def execute(jobName: String): JobExecutionResult = {
printLogo(s"FlinkTable $jobName Starting...")
- tableEnv.execute(jobName)
+ null
}
@deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 7e4daf539..abbe13822 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -65,7 +65,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
@deprecated override def execute(jobName: String): JobExecutionResult = {
printLogo(s"FlinkTable $jobName Starting...")
- tableEnv.execute(jobName)
+ null
}
@deprecated override def fromTableSource(source: TableSource[_]): Table =