[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16324078#comment-16324078 ]
ASF GitHub Bot commented on FLINK-8317: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242267 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } - private JobResult waitForJobExecutionResult( - final JobID jobId) throws ProgramInvocationException { - - final JobMessageParameters messageParameters = new JobMessageParameters(); - messageParameters.jobPathParameter.resolve(jobId); - JobExecutionResultResponseBody jobExecutionResultResponseBody; - try { - long attempt = 0; - do { - final CompletableFuture<JobExecutionResultResponseBody> responseFuture = - restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), - JobExecutionResultHeaders.getInstance(), - messageParameters); - jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - Thread.sleep(waitStrategy.sleepTime(attempt)); - attempt++; - } - while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); - } catch (IOException | TimeoutException | ExecutionException e) { - throw new ProgramInvocationException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProgramInvocationException(e); + private <R, T extends AsynchronouslyCreatedResource<R>> R waitForResource( + final SupplierWithException<CompletableFuture<T>, IOException> resourceFutureSupplier) + throws IOException, InterruptedException, ExecutionException, TimeoutException { + T asynchronouslyCreatedResource; + long attempt = 0; + do { + final CompletableFuture<T> responseFuture = resourceFutureSupplier.get(); + asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); --- End diff -- fixed > Enable Triggering of Savepoints via RestfulGateway > -------------------------------------------------- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST > Affects Versions: 1.5.0 > Reporter: Gary Yao > Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture<CompletedCheckpoint> > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)