1996fanrui commented on code in PR #1820:
URL:
https://github.com/apache/incubator-streampark/pull/1820#discussion_r993108365
##########
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala:
##########
@@ -127,50 +127,62 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
localTableEnv
}
-
/**
* In case of table SQL, the parameter conf is not required, it depends on
the developer.
*/
+
override def initParameter(): (ParameterTool, Configuration) = {
- val (userParameter: ParameterTool, flinkConf: Configuration) =
super.initParameter()
- (userParameter.get(KEY_FLINK_SQL()) match {
- case null => userParameter
+ val (appParameter: ParameterTool, flinkConf: Configuration) = {
+ val argsMap = ParameterTool.fromArgs(args)
+ argsMap.get(KEY_APP_CONF(), null) match {
+ case null | "" =>
+ logWarn("Usage:can't fond config,you can set \"--conf $path \" in
main arguments")
+ ParameterTool.fromSystemProperties().mergeWith(argsMap) -> new
Configuration()
+ case file => super.parseConfig(file)
+ }
+ }
+
+ val appParam = appParameter.get(KEY_FLINK_SQL()) match {
+ case null => appParameter
case param =>
// for streampark-console
Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+ case Success(value) =>
appParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
case Failure(_) =>
val sqlFile = new File(param)
Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(value))
+ case Success(value) =>
appParameter.mergeWith(ParameterTool.fromMap(value))
case Failure(e) =>
new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- userParameter
+ appParameter
}
}
- }, flinkConf)
+ }
+
+ appParam -> flinkConf
}
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
- val plannerType =
Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse
{
- logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by:
blinkPlanner")
- PlannerType.blink
- }
-
- plannerType match {
- case PlannerType.blink =>
- logInfo("blinkPlanner will be use.")
- builder.useBlinkPlanner()
- case PlannerType.old =>
- logInfo("oldPlanner will be use.")
- builder.useOldPlanner()
- case PlannerType.any =>
- logInfo("anyPlanner will be use.")
- builder.useAnyPlanner()
+ val plannerType =
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse(PlannerType.blink)
+
+ try {
+ plannerType match {
+ case PlannerType.blink =>
+ logInfo("blinkPlanner will be use.")
+ builder.useBlinkPlanner()
Review Comment:
As I know, Flink-1.15 removed this method. Blink planner is the default
planner.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]