wolfboys commented on code in PR #1820:
URL:
https://github.com/apache/incubator-streampark/pull/1820#discussion_r993219187
##########
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala:
##########
@@ -127,50 +127,62 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
localTableEnv
}
-
/**
* In case of table SQL, the parameter conf is not required, it depends on
the developer.
*/
+
override def initParameter(): (ParameterTool, Configuration) = {
- val (userParameter: ParameterTool, flinkConf: Configuration) =
super.initParameter()
- (userParameter.get(KEY_FLINK_SQL()) match {
- case null => userParameter
+ val (appParameter: ParameterTool, flinkConf: Configuration) = {
+ val argsMap = ParameterTool.fromArgs(args)
+ argsMap.get(KEY_APP_CONF(), null) match {
+ case null | "" =>
+ logWarn("Usage:can't fond config,you can set \"--conf $path \" in
main arguments")
+ ParameterTool.fromSystemProperties().mergeWith(argsMap) -> new
Configuration()
+ case file => super.parseConfig(file)
+ }
+ }
+
+ val appParam = appParameter.get(KEY_FLINK_SQL()) match {
+ case null => appParameter
case param =>
// for streampark-console
Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+ case Success(value) =>
appParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
case Failure(_) =>
val sqlFile = new File(param)
Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(value))
+ case Success(value) =>
appParameter.mergeWith(ParameterTool.fromMap(value))
case Failure(e) =>
new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- userParameter
+ appParameter
}
}
- }, flinkConf)
+ }
+
+ appParam -> flinkConf
}
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
- val plannerType =
Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse
{
- logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by:
blinkPlanner")
- PlannerType.blink
- }
-
- plannerType match {
- case PlannerType.blink =>
- logInfo("blinkPlanner will be use.")
- builder.useBlinkPlanner()
- case PlannerType.old =>
- logInfo("oldPlanner will be use.")
- builder.useOldPlanner()
- case PlannerType.any =>
- logInfo("anyPlanner will be use.")
- builder.useAnyPlanner()
+ val plannerType =
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse(PlannerType.blink)
+
+ try {
+ plannerType match {
+ case PlannerType.blink =>
+ logInfo("blinkPlanner will be use.")
+ builder.useBlinkPlanner()
+ case PlannerType.old =>
+ logInfo("oldPlanner will be use.")
+ builder.useOldPlanner()
+ case PlannerType.any =>
+ logInfo("anyPlanner will be use.")
+ builder.useAnyPlanner()
+ }
+ } catch {
+ case e: IncompatibleClassChangeError =>
Review Comment:
The API related to flink table env is public and stable, in the short term,
only this part will be affected, in the long term, as you said, we need a set
of standard specifications to solve these problems, How about you take care of
this part of the job? Discuss it fully in the email. It can be done in several
steps.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]