Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5451#discussion_r169676080
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
    @@ -123,214 +109,112 @@ public SavepointHandlers(@Nullable final String 
defaultSavepointDir) {
        /**
         * HTTP handler to trigger savepoints.
         */
    -   public class SavepointTriggerHandler
    -                   extends AbstractRestHandler<RestfulGateway, 
SavepointTriggerRequestBody, SavepointTriggerResponseBody, 
SavepointTriggerMessageParameters> {
    +   public class SavepointTriggerHandler extends 
TriggerHandler<RestfulGateway, SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters> {
     
                public SavepointTriggerHandler(
    -                           final CompletableFuture<String> 
localRestAddress,
    -                           final GatewayRetriever<? extends 
RestfulGateway> leaderRetriever,
    -                           final Time timeout,
    -                           final Map<String, String> responseHeaders) {
    +                   CompletableFuture<String> localRestAddress,
    +                   GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
    +                   Time timeout,
    +                   Map<String, String> responseHeaders) {
                        super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, SavepointTriggerHeaders.getInstance());
                }
     
                @Override
    -           protected CompletableFuture<SavepointTriggerResponseBody> 
handleRequest(
    -                           @Nonnull final 
HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> 
request,
    -                           @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
    -
    +           protected CompletableFuture<String> 
triggerOperation(HandlerRequest<SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters> request, RestfulGateway gateway) throws 
RestHandlerException {
                        final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
                        final String requestedTargetDirectory = 
request.getRequestBody().getTargetDirectory();
     
                        if (requestedTargetDirectory == null && 
defaultSavepointDir == null) {
    -                           return FutureUtils.completedExceptionally(
    -                                   new RestHandlerException(
    +                           throw new RestHandlerException(
                                                String.format("Config key [%s] 
is not set. Property [%s] must be provided.",
                                                        
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
                                                        
SavepointTriggerRequestBody.FIELD_NAME_TARGET_DIRECTORY),
    -                                           
HttpResponseStatus.BAD_REQUEST));
    +                                           HttpResponseStatus.BAD_REQUEST);
                        }
     
                        final String targetDirectory = requestedTargetDirectory 
!= null ? requestedTargetDirectory : defaultSavepointDir;
    -                   final CompletableFuture<String> savepointLocationFuture 
=
    -                           gateway.triggerSavepoint(jobId, 
targetDirectory, RpcUtils.INF_TIMEOUT);
    -                   final SavepointTriggerId savepointTriggerId = new 
SavepointTriggerId();
    -                   completedSavepointCache.registerOngoingSavepoint(
    -                           SavepointKey.of(savepointTriggerId, jobId),
    -                           savepointLocationFuture);
    -                   return CompletableFuture.completedFuture(
    -                           new 
SavepointTriggerResponseBody(savepointTriggerId));
    +                   return gateway.triggerSavepoint(jobId, targetDirectory, 
RpcUtils.INF_TIMEOUT);
    +           }
    +
    +           @Override
    +           protected SavepointKey 
createOperationKey(HandlerRequest<SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters> request) {
    +                   final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
    +                   return SavepointKey.of(new TriggerId(), jobId);
                }
        }
     
        /**
         * HTTP handler to query for the status of the savepoint.
         */
    -   public class SavepointStatusHandler
    -                   extends AbstractRestHandler<RestfulGateway, 
EmptyRequestBody, SavepointResponseBody, SavepointStatusMessageParameters> {
    +   public class SavepointStatusHandler extends 
StatusHandler<RestfulGateway, SavepointInfo, SavepointStatusMessageParameters> {
     
                public SavepointStatusHandler(
    -                           final CompletableFuture<String> 
localRestAddress,
    -                           final GatewayRetriever<? extends 
RestfulGateway> leaderRetriever,
    -                           final Time timeout,
    -                           final Map<String, String> responseHeaders) {
    +                   CompletableFuture<String> localRestAddress,
    --- End diff --
    
    Unnecessary changes: `final` was fine and the indentation is off.


---

Reply via email to