This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new e03ae9226 flink job batchMode improvement
e03ae9226 is described below
commit e03ae92265c21a673b0a92a082143a8c6a3a5f16
Author: benjobs <[email protected]>
AuthorDate: Sat Aug 26 16:21:04 2023 +0800
flink job batchMode improvement
---
.../flink/client/trait/FlinkClientTrait.scala | 8 +++++
.../apache/streampark/flink/cli/SqlClient.scala | 40 ++++++++++++++--------
2 files changed, 34 insertions(+), 14 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 325bc9ff3..074d9fad5 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -474,6 +474,14 @@ trait FlinkClientTrait extends Logger {
programArgs += submitRequest.appConf
}
}
+
+ // execution.runtime-mode
+ if (submitRequest.properties.nonEmpty) {
+ if
(submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
+ programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+ programArgs +=
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key())
+ }
+ }
programArgs.toList.asJava
}
diff --git
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 2edf338c1..76239c6cb 100644
---
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -26,30 +26,42 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.ExecutionOptions
import scala.language.implicitConversions
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
object SqlClient extends App {
- val parameterTool = ParameterTool.fromArgs(args)
+ private[this] val parameterTool = ParameterTool.fromArgs(args)
- val flinkSql = {
+ private[this] val flinkSql = {
val sql = parameterTool.get(KEY_FLINK_SQL())
require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be
null")
- Try(DeflaterUtils.unzipString(sql)).getOrElse(
- throw new IllegalArgumentException("Usage: flink sql is invalid or null,
please check"))
+ Try(DeflaterUtils.unzipString(sql)) match {
+ case Success(value) => value
+ case Failure(_) =>
+ throw new IllegalArgumentException("Usage: flink sql is invalid or
null, please check")
+ }
}
- val sets = SqlCommandParser.parseSQL(flinkSql).filter(_.command ==
SqlCommand.SET)
+ private[this] val sets =
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
- val mode = sets.find(_.operands.head == ExecutionOptions.RUNTIME_MODE.key())
match {
- case Some(e) => e.operands(1)
+ private[this] val defaultMode = "streaming"
+
+ private[this] val mode = sets.find(_.operands.head ==
ExecutionOptions.RUNTIME_MODE.key()) match {
+ case Some(e) =>
+ // 1) flink sql execution.runtime-mode has highest priority
+ e.operands(1)
case None =>
- val appConf = parameterTool.get(KEY_APP_CONF(), null)
- val defaultMode = "streaming"
- if (appConf == null) defaultMode
- else {
- val parameter =
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(appConf.drop(7)))
- parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+ // 2) dynamic properties execution.runtime-mode
+ parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
+ case null =>
+ parameterTool.get(KEY_APP_CONF(), null) match {
+ case null => defaultMode
+ case f =>
+ val parameter =
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
+ // 3) application conf execution.runtime-mode
+ parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+ }
+ case m => m
}
}