[ 
https://issues.apache.org/jira/browse/FLINK-8632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371620#comment-16371620
 ] 

ASF GitHub Bot commented on FLINK-8632:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169676080
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
    @@ -123,214 +109,112 @@ public SavepointHandlers(@Nullable final String 
defaultSavepointDir) {
        /**
         * HTTP handler to trigger savepoints.
         */
    -   public class SavepointTriggerHandler
    -                   extends AbstractRestHandler<RestfulGateway, 
SavepointTriggerRequestBody, SavepointTriggerResponseBody, 
SavepointTriggerMessageParameters> {
    +   public class SavepointTriggerHandler extends 
TriggerHandler<RestfulGateway, SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters> {
     
                public SavepointTriggerHandler(
    -                           final CompletableFuture<String> 
localRestAddress,
    -                           final GatewayRetriever<? extends 
RestfulGateway> leaderRetriever,
    -                           final Time timeout,
    -                           final Map<String, String> responseHeaders) {
    +                   CompletableFuture<String> localRestAddress,
    +                   GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
    +                   Time timeout,
    +                   Map<String, String> responseHeaders) {
                        super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, SavepointTriggerHeaders.getInstance());
                }
     
                @Override
    -           protected CompletableFuture<SavepointTriggerResponseBody> 
handleRequest(
    -                           @Nonnull final 
HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> 
request,
    -                           @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
    -
    +           protected CompletableFuture<String> 
triggerOperation(HandlerRequest<SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters> request, RestfulGateway gateway) throws 
RestHandlerException {
                        final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
                        final String requestedTargetDirectory = 
request.getRequestBody().getTargetDirectory();
     
                        if (requestedTargetDirectory == null && 
defaultSavepointDir == null) {
    -                           return FutureUtils.completedExceptionally(
    -                                   new RestHandlerException(
    +                           throw new RestHandlerException(
                                                String.format("Config key [%s] 
is not set. Property [%s] must be provided.",
                                                        
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
                                                        
SavepointTriggerRequestBody.FIELD_NAME_TARGET_DIRECTORY),
    -                                           
HttpResponseStatus.BAD_REQUEST));
    +                                           HttpResponseStatus.BAD_REQUEST);
                        }
     
                        final String targetDirectory = requestedTargetDirectory 
!= null ? requestedTargetDirectory : defaultSavepointDir;
    -                   final CompletableFuture<String> savepointLocationFuture 
=
    -                           gateway.triggerSavepoint(jobId, 
targetDirectory, RpcUtils.INF_TIMEOUT);
    -                   final SavepointTriggerId savepointTriggerId = new 
SavepointTriggerId();
    -                   completedSavepointCache.registerOngoingSavepoint(
    -                           SavepointKey.of(savepointTriggerId, jobId),
    -                           savepointLocationFuture);
    -                   return CompletableFuture.completedFuture(
    -                           new 
SavepointTriggerResponseBody(savepointTriggerId));
    +                   return gateway.triggerSavepoint(jobId, targetDirectory, 
RpcUtils.INF_TIMEOUT);
    +           }
    +
    +           @Override
    +           protected SavepointKey 
createOperationKey(HandlerRequest<SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters> request) {
    +                   final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
    +                   return SavepointKey.of(new TriggerId(), jobId);
                }
        }
     
        /**
         * HTTP handler to query for the status of the savepoint.
         */
    -   public class SavepointStatusHandler
    -                   extends AbstractRestHandler<RestfulGateway, 
EmptyRequestBody, SavepointResponseBody, SavepointStatusMessageParameters> {
    +   public class SavepointStatusHandler extends 
StatusHandler<RestfulGateway, SavepointInfo, SavepointStatusMessageParameters> {
     
                public SavepointStatusHandler(
    -                           final CompletableFuture<String> 
localRestAddress,
    -                           final GatewayRetriever<? extends 
RestfulGateway> leaderRetriever,
    -                           final Time timeout,
    -                           final Map<String, String> responseHeaders) {
    +                   CompletableFuture<String> localRestAddress,
    --- End diff --
    
    Unnecessary changes: `final` was fine and the indentation is off.


> Generalize SavepointHandlers to be used for other asynchronous operations
> -------------------------------------------------------------------------
>
>                 Key: FLINK-8632
>                 URL: https://issues.apache.org/jira/browse/FLINK-8632
>             Project: Flink
>          Issue Type: Improvement
>          Components: REST
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> We should generalize the {{SavepointHandlers}} to be usable for other 
> asynchronous operations as well. The basic idea is that one has a trigger 
> handler which triggers an asynchronous operation. This operation returns a 
> future which is completed once the operation is done. The trigger handler 
> returns a trigger id which can be used to check the status of the operation 
> by querying the status handler.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to