TisonKun commented on a change in pull request #10311: [FLINK-14762][client]
Enrich JobClient API
URL: https://github.com/apache/flink/pull/10311#discussion_r351868403
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
##########
@@ -48,25 +54,45 @@
private final JobID jobID;
- private final Thread shutdownHook;
+ private final AtomicBoolean running = new AtomicBoolean(true);
- public ClusterClientJobClientAdapter(final ClusterClient<ClusterID>
clusterClient, final JobID jobID, final boolean withShutdownHook) {
+ public ClusterClientJobClientAdapter(
+ final ClusterClient<ClusterID> clusterClient,
+ final JobID jobID) {
this.jobID = checkNotNull(jobID);
this.clusterClient = checkNotNull(clusterClient);
-
- if (withShutdownHook) {
- shutdownHook = ShutdownHookUtil.addShutdownHook(
- clusterClient::shutDownCluster,
clusterClient.getClass().getSimpleName(), LOG);
- } else {
- shutdownHook = null;
- }
}
@Override
public JobID getJobID() {
return jobID;
}
+ @Override
+ public CompletableFuture<JobStatus> getJobStatus() {
+ return
clusterClient.getJobStatus(jobID).thenApply(org.apache.flink.runtime.jobgraph.JobStatus::toCoreJobStatus);
+ }
+
+ @Override
+ public CompletableFuture<Map<String, OptionalFailure<Object>>>
getAccumulators(ClassLoader classLoader) {
Review comment:
We use `Map<String, OptionalFailure<Object>>` as type of accumulators in
`JobExecutionResult` & `ClusterClient` already. The investigation how we treat
of such unpack things is independent and out of the scope here in my opinion.
Shall we open a ticket for take it into consideration and prevent this one to
be too complex?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services