This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cade3664d50148474f9ad57802908e027b30b0f6 Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Sep 23 11:09:14 2021 +0200 [FLINK-24275][rest] Idempotent job cancellation --- .../flink/runtime/dispatcher/Dispatcher.java | 26 ++-- ...kJobTerminatedWithoutCancellationException.java | 52 ++++++++ .../rest/handler/job/JobCancellationHandler.java | 12 +- .../flink/runtime/dispatcher/DispatcherTest.java | 74 +++++++++++ .../apache/flink/runtime/rest/RestMatchers.java | 85 +++++++++++++ .../handler/job/JobCancellationHandlerTest.java | 138 +++++++++++++++++++++ 6 files changed, 379 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 5accdd8..aac1b7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; @@ -526,13 +527,24 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher @Override public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) { Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId); - return maybeJob.map(job -> job.cancel(timeout)) - .orElseGet( - () -> { - log.debug("Dispatcher is unable to cancel job {}: not found", jobId); - return FutureUtils.completedExceptionally( - new FlinkJobNotFoundException(jobId)); - }); + + if (maybeJob.isPresent()) { + return maybeJob.get().cancel(timeout); + } + + final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); + if (executionGraphInfo != null) { + final JobStatus jobStatus = executionGraphInfo.getArchivedExecutionGraph().getState(); + if (jobStatus == JobStatus.CANCELED) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } else { + return FutureUtils.completedExceptionally( + new FlinkJobTerminatedWithoutCancellationException(jobId, jobStatus)); + } + } + + log.debug("Dispatcher is unable to cancel job {}: not found", jobId); + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobTerminatedWithoutCancellationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobTerminatedWithoutCancellationException.java new file mode 100644 index 0000000..e611f62 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobTerminatedWithoutCancellationException.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +/** + * Exception indicating that the Flink job with the given job ID has terminated without + * cancellation. + */ +public class FlinkJobTerminatedWithoutCancellationException extends FlinkException { + + private static final long serialVersionUID = 2294698055059659025L; + + private final JobStatus jobStatus; + + public FlinkJobTerminatedWithoutCancellationException(JobID jobId, JobStatus jobStatus) { + super( + String.format( + "Flink job (%s) was not canceled, but instead %s.", + jobId, assertNotCanceled(jobStatus))); + this.jobStatus = jobStatus; + } + + public JobStatus getJobStatus() { + return jobStatus; + } + + private static JobStatus assertNotCanceled(JobStatus jobStatus) { + Preconditions.checkState(jobStatus != JobStatus.CANCELED); + return jobStatus; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java index 934d235..9108ba9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; @@ -108,7 +109,16 @@ public class JobCancellationHandler if (throwable != null) { Throwable error = ExceptionUtils.stripCompletionException(throwable); - if (error instanceof TimeoutException) { + if (error instanceof FlinkJobTerminatedWithoutCancellationException) { + throw new CompletionException( + new RestHandlerException( + String.format( + "Job cancellation failed because the job has already reached another terminal state (%s).", + ((FlinkJobTerminatedWithoutCancellationException) + error) + .getJobStatus()), + HttpResponseStatus.CONFLICT)); + } else if (error instanceof TimeoutException) { throw new CompletionException( new RestHandlerException( "Job cancellation timed out.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 9c7ff51..2daacd2 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; @@ -61,6 +62,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -372,6 +374,78 @@ public class DispatcherTest extends AbstractDispatcherTest { } @Test + public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exception { + final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = + new CompletableFuture<>(); + dispatcher = + createAndStartDispatcher( + heartbeatServices, + haServices, + new FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {})); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + JobID jobId = jobGraph.getJobID(); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + jobTerminationFuture.complete( + JobManagerRunnerResult.forSuccess( + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setJobID(jobId) + .setState(JobStatus.CANCELED) + .build()))); + + // wait for job to finish + dispatcherGateway.requestJobResult(jobId, TIMEOUT).get(); + // sanity check + assertThat( + dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), is(JobStatus.CANCELED)); + + dispatcherGateway.cancelJob(jobId, TIMEOUT).get(); + } + + @Test + public void testCancellationOfNonCanceledTerminalJobFailsWithAppropriateException() + throws Exception { + + final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = + new CompletableFuture<>(); + dispatcher = + createAndStartDispatcher( + heartbeatServices, + haServices, + new FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {})); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + JobID jobId = jobGraph.getJobID(); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + jobTerminationFuture.complete( + JobManagerRunnerResult.forSuccess( + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setJobID(jobId) + .setState(JobStatus.FINISHED) + .build()))); + + // wait for job to finish + dispatcherGateway.requestJobResult(jobId, TIMEOUT).get(); + // sanity check + assertThat( + dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), is(JobStatus.FINISHED)); + + final CompletableFuture<Acknowledge> cancelFuture = + dispatcherGateway.cancelJob(jobId, TIMEOUT); + + assertThat( + cancelFuture, + FlinkMatchers.futureFailedWith( + FlinkJobTerminatedWithoutCancellationException.class)); + } + + @Test public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception { final TestingJobManagerRunnerFactory testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestMatchers.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestMatchers.java new file mode 100644 index 0000000..ad0f54c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestMatchers.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.runtime.rest.handler.RestHandlerException; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** Hamcrest matchers for REST handlers. */ +public class RestMatchers { + + public static <T> Matcher<CompletableFuture<T>> respondsWithError( + HttpResponseStatus responseStatus) { + return new ErrorResponseStatusCodeMatcher<>(responseStatus); + } + + private static final class ErrorResponseStatusCodeMatcher<T> + extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> { + + private final HttpResponseStatus expectedErrorResponse; + + ErrorResponseStatusCodeMatcher(HttpResponseStatus expectedErrorResponse) { + this.expectedErrorResponse = expectedErrorResponse; + } + + @Override + protected boolean matchesSafely( + CompletableFuture<T> future, Description mismatchDescription) { + try { + future.get(); + + mismatchDescription.appendText("The request succeeded"); + return false; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new Error("interrupted test"); + } catch (ExecutionException e) { + if (e.getCause() != null) { + if (RestHandlerException.class.isAssignableFrom(e.getCause().getClass())) { + final RestHandlerException rhe = (RestHandlerException) e.getCause(); + if (rhe.getHttpResponseStatus() == expectedErrorResponse) { + return true; + } else { + mismatchDescription.appendText( + "Error response had different status code: " + + rhe.getHttpResponseStatus()); + return false; + } + } + } + + mismatchDescription.appendText( + "Future completed with different exception: " + e.getCause()); + return false; + } + } + + @Override + public void describeTo(Description description) { + description.appendText("An error response with status code: " + expectedErrorResponse); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandlerTest.java new file mode 100644 index 0000000..93efa4f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandlerTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException; +import org.apache.flink.runtime.rest.RestMatchers; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobCancellationHeaders; +import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import static org.hamcrest.MatcherAssert.assertThat; + +/** Tests for the {@link JobCancellationHandler}. */ +public class JobCancellationHandlerTest extends TestLogger { + @Test + public void testSuccessfulCancellation() throws Exception { + testResponse( + jobId -> CompletableFuture.completedFuture(Acknowledge.get()), + CompletableFuture::get); + } + + @Test + public void testErrorCodeForNonCanceledTerminalJob() throws Exception { + testResponseCodeOnFailedDispatcherCancellationResponse( + jobId -> + FutureUtils.completedExceptionally( + new FlinkJobTerminatedWithoutCancellationException( + jobId, JobStatus.FINISHED)), + HttpResponseStatus.CONFLICT); + } + + @Test + public void testErrorCodeForTimeout() throws Exception { + testResponseCodeOnFailedDispatcherCancellationResponse( + jobId -> FutureUtils.completedExceptionally(new TimeoutException()), + HttpResponseStatus.REQUEST_TIMEOUT); + } + + @Test + public void testErrorCodeForUnknownJob() throws Exception { + testResponseCodeOnFailedDispatcherCancellationResponse( + jobId -> FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)), + HttpResponseStatus.NOT_FOUND); + } + + @Test + public void testErrorCodeForUnknownError() throws Exception { + testResponseCodeOnFailedDispatcherCancellationResponse( + jobId -> FutureUtils.completedExceptionally(new RuntimeException()), + HttpResponseStatus.INTERNAL_SERVER_ERROR); + } + + private static void testResponseCodeOnFailedDispatcherCancellationResponse( + Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction, + HttpResponseStatus expectedErrorCode) + throws Exception { + + testResponse( + cancelJobFunction, + cancellationFuture -> + assertThat( + cancellationFuture, + RestMatchers.respondsWithError(expectedErrorCode))); + } + + private static void testResponse( + Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction, + ThrowingConsumer<CompletableFuture<EmptyResponseBody>, Exception> assertion) + throws Exception { + final RestfulGateway gateway = createGateway(cancelJobFunction); + + final JobCancellationHandler jobCancellationHandler = createHandler(gateway); + + final JobCancellationMessageParameters messageParameters = + jobCancellationHandler + .getMessageHeaders() + .getUnresolvedMessageParameters() + .resolveJobId(new JobID()); + + final CompletableFuture<EmptyResponseBody> cancellationFuture = + jobCancellationHandler.handleRequest( + HandlerRequest.create(EmptyRequestBody.getInstance(), messageParameters), + gateway); + + assertion.accept(cancellationFuture); + } + + private static RestfulGateway createGateway( + Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction) { + return new TestingRestfulGateway.Builder().setCancelJobFunction(cancelJobFunction).build(); + } + + private static JobCancellationHandler createHandler(RestfulGateway gateway) { + return new JobCancellationHandler( + () -> CompletableFuture.completedFuture(gateway), + Time.hours(1), + Collections.emptyMap(), + JobCancellationHeaders.getInstance(), + TerminationModeQueryParameter.TerminationMode.CANCEL); + } +}
