hackergin commented on code in PR #25988:
URL: https://github.com/apache/flink/pull/25988#discussion_r1919507579
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData>
fetchAllResults(ResultFetcher resultFetcher) {
}
return results;
}
+
+ private JobExecutionResult executeJob(
+ String script,
+ Configuration executionConfig,
+ OperationExecutor operationExecutor,
+ OperationHandle operationHandle) {
+ if
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+ return executeApplicationJob(script, executionConfig,
operationExecutor);
+ } else {
+ return executeSessionJob(script, executionConfig,
operationExecutor, operationHandle);
+ }
+ }
+
+ private boolean isApplicationMode(Configuration configuration) {
+ String target = configuration.get(TARGET);
+ return target.endsWith("application");
+ }
+
+ private JobExecutionResult executeSessionJob(
+ String script,
+ Configuration executionConfig,
+ OperationExecutor operationExecutor,
+ OperationHandle operationHandle) {
+ Configuration mergedConfig =
+ new
Configuration(operationExecutor.getSessionContext().getSessionConf());
+ mergedConfig.addAll(executionConfig);
+
+ // submit flink streaming job
+ ResultFetcher resultFetcher =
+ operationExecutor.executeStatement(operationHandle,
mergedConfig, script);
+
+ // get execution.target and jobId, clusterId
+ List<RowData> results = fetchAllResults(resultFetcher);
+ String jobId = results.get(0).getString(0).toString();
+ String executeTarget =
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+ Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+ return new JobExecutionResult(jobId, executeTarget,
clusterId.orElse(null));
+ }
+
+ private JobExecutionResult executeApplicationJob(
+ String script, Configuration executionConfig, OperationExecutor
operationExecutor) {
+ List<String> arguments = new ArrayList<>();
+ arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+ arguments.add(script);
+
+ Configuration mergedConfig =
+ new
Configuration(operationExecutor.getSessionContext().getSessionConf());
+ mergedConfig.addAll(executionConfig);
+ JobID jobId = new JobID();
+ mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
Review Comment:
If not set, jobId will be generated after being submitted to the application
cluster. On the one hand, this is an asynchronous logic. At the same time,
there is no good way to obtain it. Therefore, we need to generate jobId in
advance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]