This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bafbcefa7697e4f69ae7bb1a28e0cee379ad417a Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Sep 23 13:02:38 2021 +0200 [FLINK-24275][rest] Streamline JobCancellationMessageParameters --- .../flink/client/program/rest/RestClusterClient.java | 9 +++++---- .../rest/messages/JobCancellationMessageParameters.java | 17 +++++++++++++++-- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index dd9f167..09c6de5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -436,10 +436,11 @@ public class RestClusterClient<T> implements ClusterClient<T> { @Override public CompletableFuture<Acknowledge> cancel(JobID jobID) { - JobCancellationMessageParameters params = new JobCancellationMessageParameters(); - params.jobPathParameter.resolve(jobID); - params.terminationModeQueryParameter.resolve( - Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); + JobCancellationMessageParameters params = + new JobCancellationMessageParameters() + .resolveJobId(jobID) + .resolveTerminationMode( + TerminationModeQueryParameter.TerminationMode.CANCEL); CompletableFuture<EmptyResponseBody> responseFuture = sendRequest(JobCancellationHeaders.getInstance(), params); return responseFuture.thenApply(ignore -> Acknowledge.get()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java index 59a2136..2902f77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.rest.messages; +import org.apache.flink.api.common.JobID; + import java.util.Collection; import java.util.Collections; @@ -28,8 +30,8 @@ import java.util.Collections; */ public class JobCancellationMessageParameters extends MessageParameters { - public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - public final TerminationModeQueryParameter terminationModeQueryParameter = + private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); @Override @@ -41,4 +43,15 @@ public class JobCancellationMessageParameters extends MessageParameters { public Collection<MessageQueryParameter<?>> getQueryParameters() { return Collections.singleton(terminationModeQueryParameter); } + + public JobCancellationMessageParameters resolveJobId(JobID jobId) { + jobPathParameter.resolve(jobId); + return this; + } + + public JobCancellationMessageParameters resolveTerminationMode( + TerminationModeQueryParameter.TerminationMode terminationMode) { + terminationModeQueryParameter.resolve(Collections.singletonList(terminationMode)); + return this; + } }
