[
https://issues.apache.org/jira/browse/FLINK-8778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378332#comment-16378332
]
ASF GitHub Bot commented on FLINK-8778:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5579#discussion_r170864042
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
---
@@ -577,6 +583,81 @@ public JobListeningContext connectToJob(JobID jobID)
throws JobExecutionExceptio
printStatusDuringExecution);
}
+ /**
+ * Requests the {@link JobStatus} of the job with the given {@link
JobID}.
+ */
+ public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+ final ActorGateway jobManager;
+ try {
+ jobManager = getJobManagerGateway();
+ } catch (FlinkException e) {
+ throw new RuntimeException("Could not retrieve
JobManage gateway.", e);
+ }
+
+ Future<Object> response =
jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
+
+ CompletableFuture<Object> javaFuture =
FutureUtils.toJava(response);
+
+ return javaFuture.thenApply((responseMessage) -> {
+ if (responseMessage instanceof
JobManagerMessages.CurrentJobStatus) {
+ return ((JobManagerMessages.CurrentJobStatus)
responseMessage).status();
+ } else if (responseMessage instanceof
JobManagerMessages.JobNotFound) {
+ throw new CompletionException(
+ new IllegalStateException("Could not
find job with JobId " + jobId));
+ } else {
+ throw new CompletionException(
+ new IllegalStateException("Unknown
JobManager response of type " + responseMessage.getClass()));
+ }
+ });
+ }
+
+ /**
+ * Requests the {@link JobResult} for the given {@link JobID}. The
method retries multiple
+ * times to poll the {@link JobResult} before giving up.
+ *
+ * @param jobId specifying the job for which to retrieve the {@link
JobResult}
+ * @return Future which is completed with the {@link JobResult} once
the job has completed or
+ * with a failure if the {@link JobResult} could not be retrieved.
+ */
+ public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
+
+ CompletableFuture<JobResult> result = new CompletableFuture<>();
+
+ try {
+ JobExecutionResult jobExecutionResult =
retrieveJob(jobId);
+ Map<String, Object> allAccumulatorResults =
jobExecutionResult.getAllAccumulatorResults();
+ Map<String, SerializedValue<Object>>
allAccumulatorResultsSerialized = new HashMap<>();
+
+ for (Map.Entry<String, Object> acc :
allAccumulatorResults.entrySet()) {
+ SerializedValue<Object> objectSerializedValue =
null;
+ try {
+ objectSerializedValue = new
SerializedValue<>(acc.getValue());
+ } catch (IOException e) {
+ throw new RuntimeException("Could not
serialize accumulator result.", e);
--- End diff --
Let's fail the returned future exceptionally in this case.
> Migrate queryable state ITCases to use MiniClusterResource
> ----------------------------------------------------------
>
> Key: FLINK-8778
> URL: https://issues.apache.org/jira/browse/FLINK-8778
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)