This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 087302ed6 [Feature] Distinguish between Flink parameters and user
parameters when starting a Flink job (#1774)
087302ed6 is described below
commit 087302ed675505d5bd021f18cedf0111d466713b
Author: 1996fanrui <[email protected]>
AuthorDate: Sat Oct 8 22:50:55 2022 +0800
[Feature] Distinguish between Flink parameters and user parameters when
starting a Flink job (#1774)
---
.../src/main/resources/flink-application.conf | 36 ++++++--------
.../flink/core/FlinkStreamingInitializer.scala | 54 ++++++++++-----------
.../flink/core/FlinkTableInitializer.scala | 55 +++++++++------------
.../flink/core/FlinkTableInitializer.scala | 56 +++++++++-------------
.../flink/core/FlinkTableInitializer.scala | 56 +++++++++-------------
.../flink/core/FlinkTableInitializer.scala | 56 +++++++++-------------
6 files changed, 131 insertions(+), 182 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
index 4f26c689c..bde9400de 100644
---
a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
+++
b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
@@ -44,6 +44,21 @@ flink:
managed.fraction: 0.4
pipeline:
auto-watermark-interval: 200ms
+ # checkpoint
+ execution:
+ checkpointing:
+ mode: EXACTLY_ONCE
+ interval: 30s
+ timeout: 10min
+ unaligned: false
+ externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
+ # state backend
+ state:
+ backend: hashmap # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
+ backend.incremental: true
+ checkpoint-storage: filesystem
+ savepoints.dir: file:///tmp/chkdir
+ checkpoints.dir: file:///tmp/chkdir
# restart strategy
restart-strategy: fixed-delay # Restart strategy
[(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
restart-strategy.fixed-delay:
@@ -53,27 +68,6 @@ flink:
max-failures-per-interval:
failure-rate-interval:
delay:
- checkpoints:
- enable: true
- interval: 30000
- mode: EXACTLY_ONCE
- timeout: 300000
- unaligned: true
- # state backend
- state:
- # Note that the configurations of flink1.12 and later are different, and
the combined configuration should be selected reasonably
- backend: # see
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html
- value: filesystem # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
- memory: 5242880 # Effective for jobmanager, maximum memory
- async: false # Valid for (jobmanager, filesystem), whether to enable
asynchronous
- incremental: true # Valid for rocksdb, whether to enable incremental
- # Configuration reference of rocksdb
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend
- # Remove the prefix of rocksdb configuration key: state.backend
- #rocksdb.block.blocksize:
- checkpoint-storage: filesystem # Special note: This parameter is only
valid in flink 1.12+, and the state.backend.value is hashmap, optional:
(jobmanager | filesystem)
- checkpoints.dir: file:///tmp/chkdir
- savepoints.dir: file:///tmp/chkdir
- checkpoints.num-retained: 1
# table
table:
planner: blink # (blink|old|any)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 16e2c9d21..c41d0c0be 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -20,9 +20,10 @@ import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util._
-
import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment
=> JavaStreamEnv}
import org.apache.flink.table.api.TableConfig
import java.io.File
@@ -44,7 +45,7 @@ private[flink] object FlinkStreamingInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment)
}
def initialize(args: StreamEnvConfig): (ParameterTool,
StreamExecutionEnvironment) = {
@@ -57,7 +58,7 @@ private[flink] object FlinkStreamingInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment)
}
}
@@ -72,29 +73,26 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
var javaTableEnvConfFunc: TableEnvConfigFunction = _
- lazy val parameter: ParameterTool = initParameter()
+ lazy val (userParameter: ParameterTool, flinkConf: Configuration) =
initParameter()
private[this] var localStreamEnv: StreamExecutionEnvironment = _
- private[this] lazy val defaultFlinkConf: Map[String, String] = {
- parameter.get(KEY_FLINK_CONF(), null) match {
- case null =>
- // start with script
- val flinkHome = System.getenv("FLINK_HOME")
- require(flinkHome != null, "FLINK_HOME not found.")
- logInfo(s"flinkHome: $flinkHome")
- val yaml = new File(s"$flinkHome/conf/flink-conf.yaml")
- PropertiesUtils.loadFlinkConfYaml(yaml)
- case flinkConf =>
- // passed in from the streampark console backend
- PropertiesUtils.loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf))
- }
+ def readUserAndFlinkConf(config: String): (Map[String, String], Map[String,
String]) = {
+ val allConf = readConf(config)
+ val userConf = allConf
+ .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
+ .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
+
+ val flinkConf = allConf
+ .filter(_._1.startsWith(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX))
+ .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
+ (userConf, flinkConf)
}
- def readFlinkConf(config: String): Map[String, String] = {
+ def readConf(config: String): Map[String, String] = {
val extension = config.split("\\.").last.toLowerCase
- val map = config match {
+ config match {
case x if x.startsWith("yaml://") =>
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
case x if x.startsWith("prop://") =>
@@ -119,13 +117,9 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
case _ => throw new IllegalArgumentException("[StreamPark]
Usage:flink.conf file error,must be properties or yml")
}
}
-
- map
- .filter(!_._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX))
- .map(x => x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2)
}
- def initParameter(): ParameterTool = {
+ def initParameter(): (ParameterTool, Configuration) = {
val argsMap = ParameterTool.fromArgs(args)
val config = argsMap.get(KEY_APP_CONF(), null) match {
// scalastyle:off throwerror
@@ -133,9 +127,9 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
// scalastyle:on throwerror
case file => file
}
- val configArgs = readFlinkConf(config)
+ val (userConf, flinkConf) = readUserAndFlinkConf(config)
// config priority: explicitly specified priority > project profiles >
system profiles
-
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
+
(ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(userConf)).mergeWith(argsMap),
Configuration.fromMap(flinkConf))
}
def streamEnvironment: StreamExecutionEnvironment = {
@@ -150,14 +144,14 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
}
def initEnvironment(): Unit = {
- localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ localStreamEnv = new
StreamExecutionEnvironment(JavaStreamEnv.getExecutionEnvironment(flinkConf))
apiType match {
- case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
- case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, parameter)
+ case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, userParameter)
+ case ApiType.scala if streamEnvConfFunc != null =>
streamEnvConfFunc(localStreamEnv, userParameter)
case _ =>
}
- localStreamEnv.getConfig.setGlobalJobParameters(parameter)
+ localStreamEnv.getConfig.setGlobalJobParameters(userParameter)
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index f0d097004..7838710ad 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.configuration.{Configuration, PipelineOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableEnvironment}
@@ -48,7 +48,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
}
def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -61,7 +61,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
}
def initialize(args: Array[String],
@@ -78,7 +78,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
}
def initialize(args: StreamTableEnvConfig):
@@ -93,7 +93,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
}
}
@@ -131,38 +131,29 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
/**
* In case of table SQL, the parameter conf is not required, it depends on
the developer.
*/
- override def initParameter(): ParameterTool = {
- val argsMap = ParameterTool.fromArgs(args)
- val parameter = argsMap.get(KEY_APP_CONF(), null) match {
- case null | "" =>
- logWarn("Usage:can't fond config,you can set \"--conf $path \" in main
arguments")
- ParameterTool.fromSystemProperties().mergeWith(argsMap)
- case file =>
- val configArgs = super.readFlinkConf(file)
- // config priority: explicitly specified priority > project profiles >
system profiles
-
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
- }
- parameter.get(KEY_FLINK_SQL()) match {
- case null => parameter
+ override def initParameter(): (ParameterTool, Configuration) = {
+ val (userParameter: ParameterTool, flinkConf: Configuration) =
super.initParameter()
+ (userParameter.get(KEY_FLINK_SQL()) match {
+ case null => userParameter
case param =>
// for streampark-console
Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) =>
parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+ case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
case Failure(_) =>
val sqlFile = new File(param)
Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) =>
parameter.mergeWith(ParameterTool.fromMap(value))
+ case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(value))
case Failure(e) =>
new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- parameter
+ userParameter
}
}
- }
+ }, flinkConf)
}
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
- val plannerType =
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
+ val plannerType =
Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse
{
logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by:
blinkPlanner")
PlannerType.blink
}
@@ -179,7 +170,7 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
builder.useAnyPlanner()
}
- val mode =
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
+ val mode =
Try(TableMode.withName(userParameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
mode match {
case TableMode.batch =>
logInfo(s"components should work in $tableMode mode")
@@ -189,7 +180,7 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
builder.inStreamingMode()
}
- val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
+ val buildWith = (userParameter.get(KEY_FLINK_TABLE_CATALOG),
userParameter.get(KEY_FLINK_TABLE_DATABASE))
buildWith match {
case (x: String, y: String) if x != null && y != null =>
logInfo(s"with built in catalog: $x")
@@ -210,14 +201,14 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
case TableMode.streaming =>
initEnvironment()
if (streamEnvConfFunc != null) {
- streamEnvConfFunc(streamEnvironment, parameter)
+ streamEnvConfFunc(streamEnvironment, userParameter)
}
if (javaStreamEnvConfFunc != null) {
- javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
parameter)
+ javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
userParameter)
}
localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment,
setting)
}
- val appName = (parameter.get(KEY_APP_NAME(), null),
parameter.get(KEY_FLINK_APP_NAME, null)) match {
+ val appName = (userParameter.get(KEY_APP_NAME(), null),
userParameter.get(KEY_FLINK_APP_NAME, null)) match {
case (appName: String, _) => appName
case (null, appName: String) => appName
case _ => null
@@ -233,15 +224,15 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
case ApiType.java =>
if (javaTableEnvConfFunc != null) {
tableMode match {
- case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
- case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
+ case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, userParameter)
+ case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, userParameter)
}
}
case ApiType.scala =>
if (tableConfFunc != null) {
tableMode match {
- case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
parameter)
- case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, parameter)
+ case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
userParameter)
+ case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, userParameter)
}
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 3f5b01dc9..9bcc1bc69 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.configuration.{Configuration, PipelineOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableEnvironment}
@@ -48,7 +48,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
}
def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -61,7 +61,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
}
def initialize(args: Array[String],
@@ -78,7 +78,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
}
def initialize(args: StreamTableEnvConfig):
@@ -93,7 +93,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
}
}
@@ -130,39 +130,29 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
/**
* In case of table SQL, the parameter conf is not required, it depends on
the developer.
*/
- override def initParameter(): ParameterTool = {
- val argsMap = ParameterTool.fromArgs(args)
- val parameter = argsMap.get(KEY_APP_CONF(), null) match {
- case null | "" =>
- logWarn("Usage:can't fond config,you can set \"--conf $path \" in main
arguments")
- ParameterTool.fromSystemProperties().mergeWith(argsMap)
- case file =>
- val configArgs = super.readFlinkConf(file)
- // config priority: explicitly specified priority > project profiles >
system profiles
-
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
- }
- parameter.get(KEY_FLINK_SQL()) match {
- case null => parameter
+ override def initParameter(): (ParameterTool, Configuration) = {
+ val (userParameter: ParameterTool, flinkConf: Configuration) =
super.initParameter()
+ (userParameter.get(KEY_FLINK_SQL()) match {
+ case null => userParameter
case param =>
// for streampark-console
Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) =>
parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+ case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
case Failure(_) =>
val sqlFile = new File(param)
Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) =>
parameter.mergeWith(ParameterTool.fromMap(value))
+ case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(value))
case Failure(e) =>
new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- parameter
+ userParameter
}
}
- }
-
+ }, flinkConf)
}
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
- val plannerType =
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
+ val plannerType =
Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse
{
logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by:
blinkPlanner")
PlannerType.blink
}
@@ -179,7 +169,7 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
builder.useAnyPlanner()
}
- val mode =
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
+ val mode =
Try(TableMode.withName(userParameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
mode match {
case TableMode.batch =>
logInfo(s"components should work in $tableMode mode")
@@ -189,7 +179,7 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
builder.inStreamingMode()
}
- val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
+ val buildWith = (userParameter.get(KEY_FLINK_TABLE_CATALOG),
userParameter.get(KEY_FLINK_TABLE_DATABASE))
buildWith match {
case (x: String, y: String) if x != null && y != null =>
logInfo(s"with built in catalog: $x")
@@ -210,14 +200,14 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
case TableMode.streaming =>
initEnvironment()
if (streamEnvConfFunc != null) {
- streamEnvConfFunc(streamEnvironment, parameter)
+ streamEnvConfFunc(streamEnvironment, userParameter)
}
if (javaStreamEnvConfFunc != null) {
- javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
parameter)
+ javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
userParameter)
}
localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment,
setting)
}
- val appName = (parameter.get(KEY_APP_NAME(), null),
parameter.get(KEY_FLINK_APP_NAME, null)) match {
+ val appName = (userParameter.get(KEY_APP_NAME(), null),
userParameter.get(KEY_FLINK_APP_NAME, null)) match {
case (appName: String, _) => appName
case (null, appName: String) => appName
case _ => null
@@ -233,15 +223,15 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
case ApiType.java =>
if (javaTableEnvConfFunc != null) {
tableMode match {
- case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
- case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
+ case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, userParameter)
+ case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, userParameter)
}
}
case ApiType.scala =>
if (tableConfFunc != null) {
tableMode match {
- case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
parameter)
- case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, parameter)
+ case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
userParameter)
+ case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, userParameter)
}
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 5e132d1fa..635a491e4 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.enums.{ApiType, TableMode}
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.configuration.{Configuration, PipelineOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableEnvironment}
@@ -48,7 +48,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
}
def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -61,7 +61,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
}
def initialize(args: Array[String],
@@ -78,7 +78,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
}
def initialize(args: StreamTableEnvConfig):
@@ -93,7 +93,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
}
}
@@ -130,39 +130,29 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
/**
* In case of table SQL, the parameter conf is not required, it depends on
the developer.
*/
- override def initParameter(): ParameterTool = {
- val argsMap = ParameterTool.fromArgs(args)
- val parameter = argsMap.get(KEY_APP_CONF(), null) match {
- case null | "" =>
- logWarn("Usage:can't fond config,you can set \"--conf $path \" in main
arguments")
- ParameterTool.fromSystemProperties().mergeWith(argsMap)
- case file =>
- val configArgs = super.readFlinkConf(file)
- // config priority: explicitly specified priority > project profiles >
system profiles
-
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
- }
- parameter.get(KEY_FLINK_SQL()) match {
- case null => parameter
+ override def initParameter(): (ParameterTool, Configuration) = {
+ val (userParameter: ParameterTool, flinkConf: Configuration) =
super.initParameter()
+ (userParameter.get(KEY_FLINK_SQL()) match {
+ case null => userParameter
case param =>
- //for streampark-console
+ // for streampark-console
Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) =>
parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+ case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
case Failure(_) =>
val sqlFile = new File(param)
Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) =>
parameter.mergeWith(ParameterTool.fromMap(value))
+ case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(value))
case Failure(e) =>
new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- parameter
+ userParameter
}
}
- }
-
+ }, flinkConf)
}
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
- val mode =
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
+ val mode =
Try(TableMode.withName(userParameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
mode match {
case TableMode.batch =>
logInfo(s"components should work in $tableMode mode")
@@ -172,7 +162,7 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
builder.inStreamingMode()
}
- val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
+ val buildWith = (userParameter.get(KEY_FLINK_TABLE_CATALOG),
userParameter.get(KEY_FLINK_TABLE_DATABASE))
buildWith match {
case (x: String, y: String) if x != null && y != null =>
logInfo(s"with built in catalog: $x")
@@ -193,14 +183,14 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
case TableMode.streaming =>
initEnvironment()
if (streamEnvConfFunc != null) {
- streamEnvConfFunc(streamEnvironment, parameter)
+ streamEnvConfFunc(streamEnvironment, userParameter)
}
if (javaStreamEnvConfFunc != null) {
- javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
parameter)
+ javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
userParameter)
}
localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment,
setting)
}
- val appName = (parameter.get(KEY_APP_NAME(), null),
parameter.get(KEY_FLINK_APP_NAME, null)) match {
+ val appName = (userParameter.get(KEY_APP_NAME(), null),
userParameter.get(KEY_FLINK_APP_NAME, null)) match {
case (appName: String, _) => appName
case (null, appName: String) => appName
case _ => null
@@ -216,15 +206,15 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
case ApiType.java =>
if (javaTableEnvConfFunc != null) {
tableMode match {
- case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
- case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
+ case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, userParameter)
+ case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, userParameter)
}
}
case ApiType.scala =>
if (tableConfFunc != null) {
tableMode match {
- case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
parameter)
- case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, parameter)
+ case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
userParameter)
+ case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, userParameter)
}
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 6a8207f5d..e37f4d21d 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,13 +17,13 @@
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF,
KEY_APP_NAME, KEY_FLINK_APP_NAME, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG,
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME,
KEY_FLINK_APP_NAME, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG,
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
import org.apache.streampark.common.enums.{ApiType, TableMode}
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.configuration.{Configuration, PipelineOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableEnvironment}
@@ -49,7 +49,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
}
def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -62,7 +62,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.tableEnvironment)
}
def initialize(args: Array[String],
@@ -79,7 +79,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
}
def initialize(args: StreamTableEnvConfig):
@@ -94,7 +94,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.userParameter, flinkInitializer.streamEnvironment,
flinkInitializer.streamTableEnvironment)
}
}
@@ -130,39 +130,29 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
/**
* In case of table SQL, the parameter conf is not required, it depends on
the developer.
*/
- override def initParameter(): ParameterTool = {
- val argsMap = ParameterTool.fromArgs(args)
- val parameter = argsMap.get(KEY_APP_CONF(), null) match {
- case null | "" =>
- logWarn("Usage:can't fond config,you can set \"--conf $path \" in main
arguments")
- ParameterTool.fromSystemProperties().mergeWith(argsMap)
- case file =>
- val configArgs = super.readFlinkConf(file)
- // config priority: explicitly specified priority > project profiles >
system profiles
-
ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(configArgs)).mergeWith(argsMap)
- }
- parameter.get(KEY_FLINK_SQL()) match {
- case null => parameter
+ override def initParameter(): (ParameterTool, Configuration) = {
+ val (userParameter: ParameterTool, flinkConf: Configuration) =
super.initParameter()
+ (userParameter.get(KEY_FLINK_SQL()) match {
+ case null => userParameter
case param =>
// for streampark-console
Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) =>
parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+ case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
case Failure(_) =>
val sqlFile = new File(param)
Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) =>
parameter.mergeWith(ParameterTool.fromMap(value))
+ case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(value))
case Failure(e) =>
new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- parameter
+ userParameter
}
}
- }
-
+ }, flinkConf)
}
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
- val mode =
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
+ val mode =
Try(TableMode.withName(userParameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
mode match {
case TableMode.batch =>
logInfo(s"components should work in $tableMode mode")
@@ -172,7 +162,7 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
builder.inStreamingMode()
}
- val buildWith = (parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
+ val buildWith = (userParameter.get(KEY_FLINK_TABLE_CATALOG),
userParameter.get(KEY_FLINK_TABLE_DATABASE))
buildWith match {
case (x: String, y: String) if x != null && y != null =>
logInfo(s"with built in catalog: $x")
@@ -193,14 +183,14 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
case TableMode.streaming =>
initEnvironment()
if (streamEnvConfFunc != null) {
- streamEnvConfFunc(streamEnvironment, parameter)
+ streamEnvConfFunc(streamEnvironment, userParameter)
}
if (javaStreamEnvConfFunc != null) {
- javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
parameter)
+ javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
userParameter)
}
localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment,
setting)
}
- val appName = (parameter.get(KEY_APP_NAME(), null),
parameter.get(KEY_FLINK_APP_NAME, null)) match {
+ val appName = (userParameter.get(KEY_APP_NAME(), null),
userParameter.get(KEY_FLINK_APP_NAME, null)) match {
case (appName: String, _) => appName
case (null, appName: String) => appName
case _ => null
@@ -216,15 +206,15 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
case ApiType.java =>
if (javaTableEnvConfFunc != null) {
tableMode match {
- case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
- case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
+ case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, userParameter)
+ case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, userParameter)
}
}
case ApiType.scala =>
if (tableConfFunc != null) {
tableMode match {
- case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
parameter)
- case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, parameter)
+ case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
userParameter)
+ case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, userParameter)
}
}
}