This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8dd2a3d29e17a7053a22a363b2caed6a845aff83 Author: liujiangang <[email protected]> AuthorDate: Fri Jan 28 18:03:33 2022 +0800 [hotfix][Runtime/Coordination] Minor fix --- .../runtime/dispatcher/JobDispatcherITCase.java | 87 ++++++++++++---------- .../runtime/dispatcher/MiniDispatcherTest.java | 9 +-- 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java index f565960..265892e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFact import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; @@ -47,12 +48,12 @@ import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCas import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.ClassRule; -import org.junit.Test; import org.junit.jupiter.api.Assertions; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.io.ObjectOutputStream; @@ -60,7 +61,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.function.Supplier; import static java.nio.file.StandardOpenOption.CREATE; @@ -68,12 +71,11 @@ import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetrieve import static org.junit.Assert.assertNotNull; /** An integration test which recovers from checkpoint after regaining the leadership. */ -public class JobDispatcherITCase extends TestLogger { +@ExtendWith(TestLoggerExtension.class) +public class JobDispatcherITCase { private static final Duration TIMEOUT = Duration.ofMinutes(10); - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - private Supplier<DispatcherResourceManagerComponentFactory> createJobModeDispatcherResourceManagerComponentFactorySupplier( Configuration configuration) { @@ -92,7 +94,8 @@ public class JobDispatcherITCase extends TestLogger { } @Test - public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership(@TempDir Path tmpPath) + throws Exception { final Deadline deadline = Deadline.fromNow(TIMEOUT); final Configuration configuration = new Configuration(); configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); @@ -103,9 +106,11 @@ public class JobDispatcherITCase extends TestLogger { final EmbeddedHaServicesWithLeadershipControl haServices = new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()); - Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); - long checkpointInterval = 500; - JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); + final Configuration newConfiguration = + new Configuration(clusterConfiguration.getConfiguration()); + final long checkpointInterval = 100; + final JobID jobID = + generateAndPersistJobGraph(newConfiguration, checkpointInterval, tmpPath); final TestingMiniCluster.Builder clusterBuilder = TestingMiniCluster.newBuilder(clusterConfiguration) @@ -113,18 +118,13 @@ public class JobDispatcherITCase extends TestLogger { .setDispatcherResourceManagerComponentFactorySupplier( createJobModeDispatcherResourceManagerComponentFactorySupplier( newConfiguration)); + AtLeastOneCheckpointInvokable.reset(); try (final MiniCluster cluster = clusterBuilder.build()) { // start mini cluster and submit the job cluster.start(); - // wait until job is running - awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); - CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false); - CommonTestUtils.waitUntilCondition( - () -> queryCompletedCheckpoints(cluster, jobID) > 0L, - Deadline.fromNow(Duration.ofSeconds(30)), - checkpointInterval / 2); + AtLeastOneCheckpointInvokable.atLeastOneCheckpointCompleted.await(); final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobID); haServices.revokeDispatcherLeadership(); @@ -142,19 +142,13 @@ public class JobDispatcherITCase extends TestLogger { .get() .getCheckpointStatsSnapshot() .getLatestRestoredCheckpoint()); - - cluster.cancelJob(jobID); - - // the cluster should shut down automatically once the application completes - CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline); } } - private JobID generateJobGraph(Configuration configuration, long checkpointInterval) - throws Exception { + private JobID generateAndPersistJobGraph( + Configuration configuration, long checkpointInterval, Path tmpPath) throws Exception { final JobVertex jobVertex = new JobVertex("jobVertex"); - jobVertex.setInvokableClass( - AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable.class); + jobVertex.setInvokableClass(AtLeastOneCheckpointInvokable.class); jobVertex.setParallelism(1); final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = @@ -163,14 +157,13 @@ public class JobDispatcherITCase extends TestLogger { .build(); final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null); - JobGraph jobGraph = + final JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder() .addJobVertex(jobVertex) .setJobCheckpointingSettings(checkpointingSettings) .build(); - final Path jobGraphPath = - TEMPORARY_FOLDER.newFile(JOB_GRAPH_FILE_PATH.defaultValue()).toPath(); + final Path jobGraphPath = tmpPath.resolve(JOB_GRAPH_FILE_PATH.defaultValue()); try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(jobGraphPath, CREATE))) { objectOutputStream.writeObject(jobGraph); @@ -179,16 +172,6 @@ public class JobDispatcherITCase extends TestLogger { return jobGraph.getJobID(); } - private long queryCompletedCheckpoints(MiniCluster miniCluster, JobID jobID) - throws InterruptedException, ExecutionException { - return miniCluster - .getArchivedExecutionGraph(jobID) - .get() - .getCheckpointStatsSnapshot() - .getCounts() - .getNumberOfCompletedCheckpoints(); - } - private static void awaitJobStatus( MiniCluster cluster, JobID jobId, JobStatus status, Deadline deadline) throws Exception { @@ -207,4 +190,28 @@ public class JobDispatcherITCase extends TestLogger { }, deadline); } + + /** + * An invokable that supports checkpointing and counts down when there is at least one + * checkpoint. + */ + public static class AtLeastOneCheckpointInvokable + extends AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable { + + private static volatile CountDownLatch atLeastOneCheckpointCompleted; + + private static void reset() { + atLeastOneCheckpointCompleted = new CountDownLatch(1); + } + + public AtLeastOneCheckpointInvokable(Environment environment) { + super(environment); + } + + @Override + public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) { + atLeastOneCheckpointCompleted.countDown(); + return CompletableFuture.completedFuture(null); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 811f303..d85f20e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.util.TestLogger; +import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -58,13 +59,11 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; /** Tests for the {@link MiniDispatcher}. */ public class MiniDispatcherTest extends TestLogger { @@ -225,10 +224,8 @@ public class MiniDispatcherTest extends TestLogger { .setState(JobStatus.SUSPENDED) .build())); - miniDispatcher.getShutDownFuture().get(3, TimeUnit.SECONDS); - fail("The shutDownFuture should not be done."); - } catch (TimeoutException ignored) { - + testingJobManagerRunner.getTerminationFuture().get(); + Assertions.assertThat(miniDispatcher.getShutDownFuture()).isNotDone(); } finally { RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout); }
