This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cade3664d50148474f9ad57802908e027b30b0f6
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Sep 23 11:09:14 2021 +0200

    [FLINK-24275][rest] Idempotent job cancellation
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  26 ++--
 ...kJobTerminatedWithoutCancellationException.java |  52 ++++++++
 .../rest/handler/job/JobCancellationHandler.java   |  12 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |  74 +++++++++++
 .../apache/flink/runtime/rest/RestMatchers.java    |  85 +++++++++++++
 .../handler/job/JobCancellationHandlerTest.java    | 138 +++++++++++++++++++++
 6 files changed, 379 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5accdd8..aac1b7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import 
org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
@@ -526,13 +527,24 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
     @Override
     public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) 
{
         Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId);
-        return maybeJob.map(job -> job.cancel(timeout))
-                .orElseGet(
-                        () -> {
-                            log.debug("Dispatcher is unable to cancel job {}: 
not found", jobId);
-                            return FutureUtils.completedExceptionally(
-                                    new FlinkJobNotFoundException(jobId));
-                        });
+
+        if (maybeJob.isPresent()) {
+            return maybeJob.get().cancel(timeout);
+        }
+
+        final ExecutionGraphInfo executionGraphInfo = 
executionGraphInfoStore.get(jobId);
+        if (executionGraphInfo != null) {
+            final JobStatus jobStatus = 
executionGraphInfo.getArchivedExecutionGraph().getState();
+            if (jobStatus == JobStatus.CANCELED) {
+                return CompletableFuture.completedFuture(Acknowledge.get());
+            } else {
+                return FutureUtils.completedExceptionally(
+                        new 
FlinkJobTerminatedWithoutCancellationException(jobId, jobStatus));
+            }
+        }
+
+        log.debug("Dispatcher is unable to cancel job {}: not found", jobId);
+        return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobTerminatedWithoutCancellationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobTerminatedWithoutCancellationException.java
new file mode 100644
index 0000000..e611f62
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobTerminatedWithoutCancellationException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exception indicating that the Flink job with the given job ID has 
terminated without
+ * cancellation.
+ */
+public class FlinkJobTerminatedWithoutCancellationException extends 
FlinkException {
+
+    private static final long serialVersionUID = 2294698055059659025L;
+
+    private final JobStatus jobStatus;
+
+    public FlinkJobTerminatedWithoutCancellationException(JobID jobId, 
JobStatus jobStatus) {
+        super(
+                String.format(
+                        "Flink job (%s) was not canceled, but instead %s.",
+                        jobId, assertNotCanceled(jobStatus)));
+        this.jobStatus = jobStatus;
+    }
+
+    public JobStatus getJobStatus() {
+        return jobStatus;
+    }
+
+    private static JobStatus assertNotCanceled(JobStatus jobStatus) {
+        Preconditions.checkState(jobStatus != JobStatus.CANCELED);
+        return jobStatus;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java
index 934d235..9108ba9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -108,7 +109,16 @@ public class JobCancellationHandler
                     if (throwable != null) {
                         Throwable error = 
ExceptionUtils.stripCompletionException(throwable);
 
-                        if (error instanceof TimeoutException) {
+                        if (error instanceof 
FlinkJobTerminatedWithoutCancellationException) {
+                            throw new CompletionException(
+                                    new RestHandlerException(
+                                            String.format(
+                                                    "Job cancellation failed 
because the job has already reached another terminal state (%s).",
+                                                    
((FlinkJobTerminatedWithoutCancellationException)
+                                                                    error)
+                                                            .getJobStatus()),
+                                            HttpResponseStatus.CONFLICT));
+                        } else if (error instanceof TimeoutException) {
                             throw new CompletionException(
                                     new RestHandlerException(
                                             "Job cancellation timed out.",
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 9c7ff51..2daacd2 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
@@ -61,6 +62,7 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -372,6 +374,78 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
     }
 
     @Test
+    public void testCancellationOfCanceledTerminalDoesNotThrowException() 
throws Exception {
+        final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
+                new CompletableFuture<>();
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        new 
FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {}));
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        JobID jobId = jobGraph.getJobID();
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        jobTerminationFuture.complete(
+                JobManagerRunnerResult.forSuccess(
+                        new ExecutionGraphInfo(
+                                new ArchivedExecutionGraphBuilder()
+                                        .setJobID(jobId)
+                                        .setState(JobStatus.CANCELED)
+                                        .build())));
+
+        // wait for job to finish
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        // sanity check
+        assertThat(
+                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), 
is(JobStatus.CANCELED));
+
+        dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
+    }
+
+    @Test
+    public void 
testCancellationOfNonCanceledTerminalJobFailsWithAppropriateException()
+            throws Exception {
+
+        final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
+                new CompletableFuture<>();
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        new 
FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {}));
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        JobID jobId = jobGraph.getJobID();
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        jobTerminationFuture.complete(
+                JobManagerRunnerResult.forSuccess(
+                        new ExecutionGraphInfo(
+                                new ArchivedExecutionGraphBuilder()
+                                        .setJobID(jobId)
+                                        .setState(JobStatus.FINISHED)
+                                        .build())));
+
+        // wait for job to finish
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        // sanity check
+        assertThat(
+                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), 
is(JobStatus.FINISHED));
+
+        final CompletableFuture<Acknowledge> cancelFuture =
+                dispatcherGateway.cancelJob(jobId, TIMEOUT);
+
+        assertThat(
+                cancelFuture,
+                FlinkMatchers.futureFailedWith(
+                        FlinkJobTerminatedWithoutCancellationException.class));
+    }
+
+    @Test
     public void testJobManagerRunnerInitializationFailureFailsJob() throws 
Exception {
         final TestingJobManagerRunnerFactory testingJobManagerRunnerFactory =
                 new TestingJobManagerRunnerFactory();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestMatchers.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestMatchers.java
new file mode 100644
index 0000000..ad0f54c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestMatchers.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/** Hamcrest matchers for REST handlers. */
+public class RestMatchers {
+
+    public static <T> Matcher<CompletableFuture<T>> respondsWithError(
+            HttpResponseStatus responseStatus) {
+        return new ErrorResponseStatusCodeMatcher<>(responseStatus);
+    }
+
+    private static final class ErrorResponseStatusCodeMatcher<T>
+            extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
+
+        private final HttpResponseStatus expectedErrorResponse;
+
+        ErrorResponseStatusCodeMatcher(HttpResponseStatus 
expectedErrorResponse) {
+            this.expectedErrorResponse = expectedErrorResponse;
+        }
+
+        @Override
+        protected boolean matchesSafely(
+                CompletableFuture<T> future, Description mismatchDescription) {
+            try {
+                future.get();
+
+                mismatchDescription.appendText("The request succeeded");
+                return false;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new Error("interrupted test");
+            } catch (ExecutionException e) {
+                if (e.getCause() != null) {
+                    if 
(RestHandlerException.class.isAssignableFrom(e.getCause().getClass())) {
+                        final RestHandlerException rhe = 
(RestHandlerException) e.getCause();
+                        if (rhe.getHttpResponseStatus() == 
expectedErrorResponse) {
+                            return true;
+                        } else {
+                            mismatchDescription.appendText(
+                                    "Error response had different status code: 
"
+                                            + rhe.getHttpResponseStatus());
+                            return false;
+                        }
+                    }
+                }
+
+                mismatchDescription.appendText(
+                        "Future completed with different exception: " + 
e.getCause());
+                return false;
+            }
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            description.appendText("An error response with status code: " + 
expectedErrorResponse);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandlerTest.java
new file mode 100644
index 0000000..93efa4f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandlerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
+import org.apache.flink.runtime.rest.RestMatchers;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
+import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
+import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Tests for the {@link JobCancellationHandler}. */
+public class JobCancellationHandlerTest extends TestLogger {
+    @Test
+    public void testSuccessfulCancellation() throws Exception {
+        testResponse(
+                jobId -> CompletableFuture.completedFuture(Acknowledge.get()),
+                CompletableFuture::get);
+    }
+
+    @Test
+    public void testErrorCodeForNonCanceledTerminalJob() throws Exception {
+        testResponseCodeOnFailedDispatcherCancellationResponse(
+                jobId ->
+                        FutureUtils.completedExceptionally(
+                                new 
FlinkJobTerminatedWithoutCancellationException(
+                                        jobId, JobStatus.FINISHED)),
+                HttpResponseStatus.CONFLICT);
+    }
+
+    @Test
+    public void testErrorCodeForTimeout() throws Exception {
+        testResponseCodeOnFailedDispatcherCancellationResponse(
+                jobId -> FutureUtils.completedExceptionally(new 
TimeoutException()),
+                HttpResponseStatus.REQUEST_TIMEOUT);
+    }
+
+    @Test
+    public void testErrorCodeForUnknownJob() throws Exception {
+        testResponseCodeOnFailedDispatcherCancellationResponse(
+                jobId -> FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId)),
+                HttpResponseStatus.NOT_FOUND);
+    }
+
+    @Test
+    public void testErrorCodeForUnknownError() throws Exception {
+        testResponseCodeOnFailedDispatcherCancellationResponse(
+                jobId -> FutureUtils.completedExceptionally(new 
RuntimeException()),
+                HttpResponseStatus.INTERNAL_SERVER_ERROR);
+    }
+
+    private static void testResponseCodeOnFailedDispatcherCancellationResponse(
+            Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
+            HttpResponseStatus expectedErrorCode)
+            throws Exception {
+
+        testResponse(
+                cancelJobFunction,
+                cancellationFuture ->
+                        assertThat(
+                                cancellationFuture,
+                                
RestMatchers.respondsWithError(expectedErrorCode)));
+    }
+
+    private static void testResponse(
+            Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
+            ThrowingConsumer<CompletableFuture<EmptyResponseBody>, Exception> 
assertion)
+            throws Exception {
+        final RestfulGateway gateway = createGateway(cancelJobFunction);
+
+        final JobCancellationHandler jobCancellationHandler = 
createHandler(gateway);
+
+        final JobCancellationMessageParameters messageParameters =
+                jobCancellationHandler
+                        .getMessageHeaders()
+                        .getUnresolvedMessageParameters()
+                        .resolveJobId(new JobID());
+
+        final CompletableFuture<EmptyResponseBody> cancellationFuture =
+                jobCancellationHandler.handleRequest(
+                        HandlerRequest.create(EmptyRequestBody.getInstance(), 
messageParameters),
+                        gateway);
+
+        assertion.accept(cancellationFuture);
+    }
+
+    private static RestfulGateway createGateway(
+            Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction) 
{
+        return new 
TestingRestfulGateway.Builder().setCancelJobFunction(cancelJobFunction).build();
+    }
+
+    private static JobCancellationHandler createHandler(RestfulGateway 
gateway) {
+        return new JobCancellationHandler(
+                () -> CompletableFuture.completedFuture(gateway),
+                Time.hours(1),
+                Collections.emptyMap(),
+                JobCancellationHeaders.getInstance(),
+                TerminationModeQueryParameter.TerminationMode.CANCEL);
+    }
+}

Reply via email to