kl0u commented on a change in pull request #10311: [FLINK-14762][client] Enrich
JobClient API
URL: https://github.com/apache/flink/pull/10311#discussion_r351240806
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
##########
@@ -50,18 +50,20 @@ public AbstractSessionClusterExecutor(@Nonnull final
ClientFactory clusterClient
}
@Override
- public CompletableFuture<JobClient> execute(@Nonnull final Pipeline
pipeline, @Nonnull final Configuration configuration) throws Exception {
+ public CompletableFuture<? extends JobClient> execute(@Nonnull final
Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception
{
final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline,
configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
final ClusterID clusterID =
clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
final ClusterClient<ClusterID> clusterClient =
clusterDescriptor.retrieve(clusterID);
- return clusterClient
- .submitJob(jobGraph)
-
.thenApply(JobSubmissionResult::getJobID)
- .thenApply(jobID -> new
ClusterClientJobClientAdapter<>(clusterClient, jobID, false));
+ return
clusterClient.submitJob(jobGraph).thenApply(jobClient -> {
+ if (jobClient instanceof
ClusterClientJobClientAdapter) {
Review comment:
I am looking forward to your new commit later and then I will comment on
that! Thanks for the effort.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services