This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 55fa1cf56dd8633aa0a04fc19e2c86d9ae2c2617 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Tue Apr 12 13:57:15 2022 +0200 [FLINK-26977] Remove no longer valid tests in JobMasterStopWithSavepointITCase Tests in JobMasterStopWithSavepointITCase are no longer valid. In particular the way number of restarts is calculated is wrong. Moreover cases that were supposed to be tested in the class are already covered in SavepointITCase. --- .../JobMasterStopWithSavepointITCase.java | 410 --------------------- 1 file changed, 410 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java deleted file mode 100644 index b763cad5295..00000000000 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java +++ /dev/null @@ -1,410 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Deadline; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.client.program.MiniClusterClient; -import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.checkpoint.CheckpointException; -import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; -import org.apache.flink.runtime.checkpoint.SavepointType; -import org.apache.flink.runtime.checkpoint.SnapshotType; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobGraphBuilder; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; -import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; -import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; -import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; -import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.util.ExceptionUtils; - -import org.junit.Assume; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.Collections; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.either; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * ITCases testing the stop with savepoint functionality. This includes checking both SUSPEND and - * TERMINATE. - */ -public class JobMasterStopWithSavepointITCase extends AbstractTestBase { - - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - private static final long CHECKPOINT_INTERVAL = 10; - private static final int PARALLELISM = 2; - - private static OneShotLatch finishingLatch; - - private static CountDownLatch invokeLatch; - - private static CountDownLatch numberOfRestarts; - private static final AtomicLong syncSavepointId = new AtomicLong(); - private static volatile CountDownLatch checkpointsToWaitFor; - - private Path savepointDirectory; - private MiniClusterClient clusterClient; - - private JobGraph jobGraph; - - @Test - public void throwingExceptionOnCallbackWithNoRestartsShouldFailTheSuspend() throws Exception { - throwingExceptionOnCallbackWithoutRestartsHelper(false); - } - - @Test - public void throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate() throws Exception { - throwingExceptionOnCallbackWithoutRestartsHelper(true); - } - - private void throwingExceptionOnCallbackWithoutRestartsHelper(final boolean terminate) - throws Exception { - setUpJobGraph(ExceptionOnCallbackStreamTask.class, RestartStrategies.noRestart()); - - assertThat(getJobStatus(), equalTo(JobStatus.RUNNING)); - - try { - stopWithSavepoint(terminate).get(); - fail(); - } catch (Exception e) { - } - - // verifying that we actually received a synchronous checkpoint - assertTrue(syncSavepointId.get() > 0); - assertThat( - getJobStatus(), either(equalTo(JobStatus.FAILED)).or(equalTo(JobStatus.FAILING))); - } - - @Test - public void throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInSuspend() - throws Exception { - throwingExceptionOnCallbackWithRestartsHelper(false); - } - - @Test - public void throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInTerminate() - throws Exception { - throwingExceptionOnCallbackWithRestartsHelper(true); - } - - private void throwingExceptionOnCallbackWithRestartsHelper(final boolean terminate) - throws Exception { - final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(15)); - final int numberOfCheckpointsToExpect = 10; - - numberOfRestarts = new CountDownLatch(2); - checkpointsToWaitFor = new CountDownLatch(numberOfCheckpointsToExpect); - - setUpJobGraph( - ExceptionOnCallbackStreamTask.class, - RestartStrategies.fixedDelayRestart(15, Time.milliseconds(10))); - assertThat(getJobStatus(), equalTo(JobStatus.RUNNING)); - try { - stopWithSavepoint(terminate).get(50, TimeUnit.MILLISECONDS); - fail(); - } catch (Exception e) { - // expected - } - - // wait until we restart at least 2 times and until we see at least 10 checkpoints. - assertTrue(numberOfRestarts.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); - assertTrue( - checkpointsToWaitFor.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); - - // verifying that we actually received a synchronous checkpoint - assertTrue(syncSavepointId.get() > 0); - - assertThat(getJobStatus(), equalTo(JobStatus.RUNNING)); - - // make sure that we saw the synchronous savepoint and - // that after that we saw more checkpoints due to restarts. - final long syncSavepoint = syncSavepointId.get(); - assertTrue(syncSavepoint > 0 && syncSavepoint < numberOfCheckpointsToExpect); - - clusterClient.cancel(jobGraph.getJobID()).get(); - assertThat( - getJobStatus(), - either(equalTo(JobStatus.CANCELLING)).or(equalTo(JobStatus.CANCELED))); - } - - @Test - public void testRestartCheckpointCoordinatorIfStopWithSavepointFails() throws Exception { - setUpJobGraph(CheckpointCountingTask.class, RestartStrategies.noRestart()); - - try { - Files.setPosixFilePermissions(savepointDirectory, Collections.emptySet()); - } catch (IOException e) { - Assume.assumeNoException(e); - } - - try { - stopWithSavepoint(true).get(); - fail(); - } catch (Exception e) { - Optional<CheckpointException> checkpointExceptionOptional = - ExceptionUtils.findThrowable(e, CheckpointException.class); - if (!checkpointExceptionOptional.isPresent()) { - throw e; - } - String exceptionMessage = checkpointExceptionOptional.get().getMessage(); - assertTrue( - "Stop with savepoint failed because of another cause " + exceptionMessage, - exceptionMessage.contains(CheckpointFailureReason.IO_EXCEPTION.message())); - } - - final JobStatus jobStatus = - clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS); - assertThat(jobStatus, equalTo(JobStatus.RUNNING)); - // assert that checkpoints are continued to be triggered - checkpointsToWaitFor = new CountDownLatch(1); - assertTrue(checkpointsToWaitFor.await(60L, TimeUnit.SECONDS)); - } - - private CompletableFuture<String> stopWithSavepoint(boolean terminate) { - return MINI_CLUSTER_RESOURCE - .getMiniCluster() - .stopWithSavepoint( - jobGraph.getJobID(), - savepointDirectory.toAbsolutePath().toString(), - terminate, - SavepointFormatType.CANONICAL); - } - - private JobStatus getJobStatus() throws InterruptedException, ExecutionException { - return clusterClient.getJobStatus(jobGraph.getJobID()).get(); - } - - private void setUpJobGraph( - final Class<? extends TaskInvokable> invokable, - final RestartStrategies.RestartStrategyConfiguration restartStrategy) - throws Exception { - - finishingLatch = new OneShotLatch(); - - invokeLatch = new CountDownLatch(PARALLELISM); - - numberOfRestarts = new CountDownLatch(2); - checkpointsToWaitFor = new CountDownLatch(10); - - syncSavepointId.set(-1); - - savepointDirectory = temporaryFolder.newFolder().toPath(); - - Assume.assumeTrue( - "ClusterClient is not an instance of MiniClusterClient", - MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); - - clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); - - final ExecutionConfig config = new ExecutionConfig(); - config.setRestartStrategy(restartStrategy); - - final JobVertex vertex = new JobVertex("testVertex"); - vertex.setInvokableClass(invokable); - vertex.setParallelism(PARALLELISM); - - final JobCheckpointingSettings jobCheckpointingSettings = - new JobCheckpointingSettings( - new CheckpointCoordinatorConfiguration( - CHECKPOINT_INTERVAL, - 60_000, - 10, - 1, - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - true, - false, - 0, - 0), - null); - - jobGraph = - JobGraphBuilder.newStreamingJobGraphBuilder() - .setExecutionConfig(config) - .addJobVertex(vertex) - .setJobCheckpointingSettings(jobCheckpointingSettings) - .build(); - - clusterClient.submitJob(jobGraph).get(); - assertTrue(invokeLatch.await(60, TimeUnit.SECONDS)); - waitForJob(); - } - - private void waitForJob() throws Exception { - Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5)); - JobID jobID = jobGraph.getJobID(); - CommonTestUtils.waitForAllTaskRunning( - () -> - MINI_CLUSTER_RESOURCE - .getMiniCluster() - .getExecutionGraph(jobID) - .get(60, TimeUnit.SECONDS), - deadline, - false); - } - - /** - * A {@link StreamTask} which throws an exception in the {@code notifyCheckpointComplete()} for - * subtask 0. - */ - public static class ExceptionOnCallbackStreamTask extends CheckpointCountingTask { - - private long synchronousSavepointId = Long.MIN_VALUE; - - public ExceptionOnCallbackStreamTask(final Environment environment) throws Exception { - super(environment); - } - - @Override - protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { - final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); - if (taskIndex == 0) { - numberOfRestarts.countDown(); - } - super.processInput(controller); - } - - @Override - public CompletableFuture<Boolean> triggerCheckpointAsync( - CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { - final long checkpointId = checkpointMetaData.getCheckpointId(); - final SnapshotType checkpointType = checkpointOptions.getCheckpointType(); - - if (checkpointType.isSavepoint() && ((SavepointType) checkpointType).isSynchronous()) { - synchronousSavepointId = checkpointId; - syncSavepointId.compareAndSet(-1, synchronousSavepointId); - } - - return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); - } - - @Override - public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) { - final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); - if (checkpointId == synchronousSavepointId && taskIndex == 0) { - throw new RuntimeException("Expected Exception"); - } - - return super.notifyCheckpointCompleteAsync(checkpointId); - } - - @Override - public Future<Void> notifyCheckpointAbortAsync( - long checkpointId, long latestCompletedCheckpointId) { - return CompletableFuture.completedFuture(null); - } - } - - /** A {@link StreamTask} that simply waits to be terminated normally. */ - public static class NoOpBlockingStreamTask extends NoOpStreamTask { - - private transient MailboxDefaultAction.Suspension suspension; - - public NoOpBlockingStreamTask(final Environment environment) throws Exception { - super(environment); - } - - @Override - protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { - invokeLatch.countDown(); - if (suspension == null) { - suspension = controller.suspendDefaultAction(); - } else { - controller.suspendDefaultAction(); - mailboxProcessor.suspend(); - } - } - } - - /** - * A {@link StreamTask} that simply calls {@link CountDownLatch#countDown()} when invoking - * {@link #triggerCheckpointAsync}. - */ - public static class CheckpointCountingTask extends NoOpStreamTask { - - private transient MailboxDefaultAction.Suspension suspension; - - public CheckpointCountingTask(final Environment environment) throws Exception { - super(environment); - } - - @Override - protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { - invokeLatch.countDown(); - if (suspension == null) { - suspension = controller.suspendDefaultAction(); - } else { - controller.suspendDefaultAction(); - mailboxProcessor.suspend(); - } - } - - @Override - protected void cancelTask() throws Exception { - super.cancelTask(); - if (suspension != null) { - suspension.resume(); - } - } - - @Override - public CompletableFuture<Boolean> triggerCheckpointAsync( - final CheckpointMetaData checkpointMetaData, - final CheckpointOptions checkpointOptions) { - final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); - if (taskIndex == 0) { - checkpointsToWaitFor.countDown(); - } - - return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); - } - } -}
