TisonKun commented on a change in pull request #10392: [FLINK-14854][client] Add executeAsync() method to execution environments URL: https://github.com/apache/flink/pull/10392#discussion_r353138722
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ########## @@ -1619,23 +1620,74 @@ public JobExecutionResult execute(String jobName) throws Exception { */ @Internal public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { + try (final JobClient jobClient = executeAsync(streamGraph).get()) { + + return configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : new DetachedJobExecutionResult(jobClient.getJobID()); + } + } + + /** + * Triggers the program asynchronously. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * + * <p>The program execution will be logged and displayed with a generated + * default name. + * + * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. + * @throws Exception which occurs during job execution. + */ + @PublicEvolving + public final CompletableFuture<JobClient> executeAsync() throws Exception { + return executeAsync(DEFAULT_JOB_NAME); + } + + /** + * Triggers the program execution asynchronously. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * + * <p>The program execution will be logged and displayed with the provided name + * + * @param jobName desired name of the job + * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. + * @throws Exception which occurs during job execution. + */ + @PublicEvolving + public CompletableFuture<JobClient> executeAsync(String jobName) throws Exception { + return executeAsync(getStreamGraph(checkNotNull(jobName))); + } + + /** + * Triggers the program execution asynchronously. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * + * @param streamGraph the stream graph representing the transformations + * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. + * @throws Exception which occurs during job execution. + */ + @Internal + public CompletableFuture<JobClient> executeAsync(StreamGraph streamGraph) throws Exception { if (configuration.get(DeploymentOptions.TARGET) == null) { Review comment: Integrated. Thanks! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services