[
https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964784#comment-16964784
]
Jark Wu commented on FLINK-14591:
---------------------------------
I think we can put the merge logic in {{StreamExecutor#execute()}}. This is the
last step to submit a job.
> Execute PlannerBase#mergeParameters every time of calling
> PlannerBase#translate method
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Wei Zhong
> Priority: Minor
>
> In current implementation of blink planner, the method
> "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method
> to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
> override def translate(
> modifyOperations: util.List[ModifyOperation]):
> util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
> return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
> }
> {code}
> This translate method is called in every important moment, e.g. execute,
> toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not
> call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method
> and "TableEnvironment#execute" method, these configurations will not be
> merged into global job parameters because the "mergeParameters" method is not
> called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env,
> EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
>
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
>
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix
> this problem.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)