1996fanrui commented on code in PR #1820:
URL:
https://github.com/apache/incubator-streampark/pull/1820#discussion_r993131873
##########
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala:
##########
@@ -127,50 +127,62 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
localTableEnv
}
-
/**
* In case of table SQL, the parameter conf is not required, it depends on
the developer.
*/
+
override def initParameter(): (ParameterTool, Configuration) = {
- val (userParameter: ParameterTool, flinkConf: Configuration) =
super.initParameter()
- (userParameter.get(KEY_FLINK_SQL()) match {
- case null => userParameter
+ val (appParameter: ParameterTool, flinkConf: Configuration) = {
+ val argsMap = ParameterTool.fromArgs(args)
+ argsMap.get(KEY_APP_CONF(), null) match {
+ case null | "" =>
+ logWarn("Usage:can't fond config,you can set \"--conf $path \" in
main arguments")
+ ParameterTool.fromSystemProperties().mergeWith(argsMap) -> new
Configuration()
+ case file => super.parseConfig(file)
+ }
+ }
+
+ val appParam = appParameter.get(KEY_FLINK_SQL()) match {
+ case null => appParameter
case param =>
// for streampark-console
Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+ case Success(value) =>
appParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
case Failure(_) =>
val sqlFile = new File(param)
Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) =>
userParameter.mergeWith(ParameterTool.fromMap(value))
+ case Success(value) =>
appParameter.mergeWith(ParameterTool.fromMap(value))
case Failure(e) =>
new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- userParameter
+ appParameter
}
}
- }, flinkConf)
+ }
+
+ appParam -> flinkConf
}
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
- val plannerType =
Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse
{
- logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by:
blinkPlanner")
- PlannerType.blink
- }
-
- plannerType match {
- case PlannerType.blink =>
- logInfo("blinkPlanner will be use.")
- builder.useBlinkPlanner()
- case PlannerType.old =>
- logInfo("oldPlanner will be use.")
- builder.useOldPlanner()
- case PlannerType.any =>
- logInfo("anyPlanner will be use.")
- builder.useAnyPlanner()
+ val plannerType =
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse(PlannerType.blink)
+
+ try {
+ plannerType match {
+ case PlannerType.blink =>
+ logInfo("blinkPlanner will be use.")
+ builder.useBlinkPlanner()
+ case PlannerType.old =>
+ logInfo("oldPlanner will be use.")
+ builder.useOldPlanner()
+ case PlannerType.any =>
+ logInfo("anyPlanner will be use.")
+ builder.useAnyPlanner()
+ }
+ } catch {
+ case e: IncompatibleClassChangeError =>
Review Comment:
After Flink 1.14, blink planner as the default planner. So Flink 1.12 and
1.13 has the `useBlinkPlanner ` method, but Flink 1.14 and 1.15 don't have the
method.
Currently, this PR use the `try catch IncompatibleClassChangeError ` to be
compatible with this problem. From the code specification, it is not
reasonable. In the long term, it is very difficult to maintain.
We need a general code specification: How to adapt methods that are
incompatible with each Flink version?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]