wuchong commented on a change in pull request #14962:
URL: https://github.com/apache/flink/pull/14962#discussion_r584552037
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
##########
@@ -751,17 +751,28 @@ private void initializeCatalogs() {
}
private StreamExecutionEnvironment createStreamExecutionEnvironment() {
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ // We need not different StreamExecutionEnvironments to build and
submit flink job,
+ // instead we just use
StreamExecutionEnvironment#executeAsync(StreamGraph) method
+ // to execute existing StreamGraph.
+ // This requires StreamExecutionEnvironment to have a full flink
configuration.
+ final StreamExecutionEnvironment env =
+ new StreamExecutionEnvironment(new Configuration(flinkConfig),
classLoader);
Review comment:
I suggest to use
`StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig)` instead.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
##########
@@ -661,7 +661,7 @@ private void createTableEnvironment(
classLoader);
} else if (environment.getExecution().isBatchPlanner()) {
streamExecEnv = null;
- execEnv = ExecutionEnvironment.getExecutionEnvironment();
+ execEnv = createExecutionEnvironment();
Review comment:
Use `StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig)`
instead.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
##########
@@ -694,13 +694,18 @@ private boolean callInsert(SqlCommandCall cmdCall) {
printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
try {
- final ProgramTargetDescriptor programTarget =
- executor.executeUpdate(sessionId, cmdCall.operands[0]);
+ final TableResult tableResult = executor.executeSql(sessionId,
cmdCall.operands[0]);
+ checkState(tableResult.getJobClient().isPresent());
terminal.writer()
.println(
CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED)
.toAnsi());
- terminal.writer().println(programTarget.toString());
+ // keep compatibility with before
+ terminal.writer()
+ .println(
+ String.format(
+ "Job ID: %s",
Review comment:
We may need the "\n" at the end?
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
##########
@@ -1769,32 +1762,14 @@ private void verifySinkResult(String path) throws
IOException {
}
private void executeAndVerifySinkResult(
- Executor executor,
- String sessionId,
- String statement,
- RunnableWithException verifyResult)
+ Executor executor, String sessionId, String statement, String
resultPath)
throws Exception {
- final ProgramTargetDescriptor targetDescriptor =
- executor.executeUpdate(sessionId, statement);
-
- // wait for job completion and verify result
- boolean isRunning = true;
- while (isRunning) {
- Thread.sleep(50); // slow the processing down
- final JobStatus jobStatus =
-
clusterClient.getJobStatus(targetDescriptor.getJobId()).get();
- switch (jobStatus) {
- case CREATED:
- case RUNNING:
- continue;
- case FINISHED:
- isRunning = false;
- verifyResult.run();
- break;
- default:
- fail("Unexpected job status.");
- }
- }
+ final TableResult tableResult = executor.executeSql(sessionId,
statement);
+ checkState(tableResult.getJobClient().isPresent());
+ // wait for job completion
+ tableResult.getJobClient().get().getJobExecutionResult().get();
Review comment:
Can be simplified to `tableResult.await()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]