Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5451#discussion_r169676080
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
---
@@ -123,214 +109,112 @@ public SavepointHandlers(@Nullable final String
defaultSavepointDir) {
/**
* HTTP handler to trigger savepoints.
*/
- public class SavepointTriggerHandler
- extends AbstractRestHandler<RestfulGateway,
SavepointTriggerRequestBody, SavepointTriggerResponseBody,
SavepointTriggerMessageParameters> {
+ public class SavepointTriggerHandler extends
TriggerHandler<RestfulGateway, SavepointTriggerRequestBody,
SavepointTriggerMessageParameters> {
public SavepointTriggerHandler(
- final CompletableFuture<String>
localRestAddress,
- final GatewayRetriever<? extends
RestfulGateway> leaderRetriever,
- final Time timeout,
- final Map<String, String> responseHeaders) {
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway>
leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders) {
super(localRestAddress, leaderRetriever, timeout,
responseHeaders, SavepointTriggerHeaders.getInstance());
}
@Override
- protected CompletableFuture<SavepointTriggerResponseBody>
handleRequest(
- @Nonnull final
HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters>
request,
- @Nonnull final RestfulGateway gateway) throws
RestHandlerException {
-
+ protected CompletableFuture<String>
triggerOperation(HandlerRequest<SavepointTriggerRequestBody,
SavepointTriggerMessageParameters> request, RestfulGateway gateway) throws
RestHandlerException {
final JobID jobId =
request.getPathParameter(JobIDPathParameter.class);
final String requestedTargetDirectory =
request.getRequestBody().getTargetDirectory();
if (requestedTargetDirectory == null &&
defaultSavepointDir == null) {
- return FutureUtils.completedExceptionally(
- new RestHandlerException(
+ throw new RestHandlerException(
String.format("Config key [%s]
is not set. Property [%s] must be provided.",
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
SavepointTriggerRequestBody.FIELD_NAME_TARGET_DIRECTORY),
-
HttpResponseStatus.BAD_REQUEST));
+ HttpResponseStatus.BAD_REQUEST);
}
final String targetDirectory = requestedTargetDirectory
!= null ? requestedTargetDirectory : defaultSavepointDir;
- final CompletableFuture<String> savepointLocationFuture
=
- gateway.triggerSavepoint(jobId,
targetDirectory, RpcUtils.INF_TIMEOUT);
- final SavepointTriggerId savepointTriggerId = new
SavepointTriggerId();
- completedSavepointCache.registerOngoingSavepoint(
- SavepointKey.of(savepointTriggerId, jobId),
- savepointLocationFuture);
- return CompletableFuture.completedFuture(
- new
SavepointTriggerResponseBody(savepointTriggerId));
+ return gateway.triggerSavepoint(jobId, targetDirectory,
RpcUtils.INF_TIMEOUT);
+ }
+
+ @Override
+ protected SavepointKey
createOperationKey(HandlerRequest<SavepointTriggerRequestBody,
SavepointTriggerMessageParameters> request) {
+ final JobID jobId =
request.getPathParameter(JobIDPathParameter.class);
+ return SavepointKey.of(new TriggerId(), jobId);
}
}
/**
* HTTP handler to query for the status of the savepoint.
*/
- public class SavepointStatusHandler
- extends AbstractRestHandler<RestfulGateway,
EmptyRequestBody, SavepointResponseBody, SavepointStatusMessageParameters> {
+ public class SavepointStatusHandler extends
StatusHandler<RestfulGateway, SavepointInfo, SavepointStatusMessageParameters> {
public SavepointStatusHandler(
- final CompletableFuture<String>
localRestAddress,
- final GatewayRetriever<? extends
RestfulGateway> leaderRetriever,
- final Time timeout,
- final Map<String, String> responseHeaders) {
+ CompletableFuture<String> localRestAddress,
--- End diff --
Unnecessary changes: `final` was fine and the indentation is off.
---