This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8dd2a3d29e17a7053a22a363b2caed6a845aff83
Author: liujiangang <[email protected]>
AuthorDate: Fri Jan 28 18:03:33 2022 +0800

    [hotfix][Runtime/Coordination] Minor fix
---
 .../runtime/dispatcher/JobDispatcherITCase.java    | 87 ++++++++++++----------
 .../runtime/dispatcher/MiniDispatcherTest.java     |  9 +--
 2 files changed, 50 insertions(+), 46 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
index f565960..265892e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFact
 import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
@@ -47,12 +48,12 @@ import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCas
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.jupiter.api.Assertions;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
@@ -60,7 +61,9 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
 import static java.nio.file.StandardOpenOption.CREATE;
@@ -68,12 +71,11 @@ import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetrieve
 import static org.junit.Assert.assertNotNull;
 
 /** An integration test which recovers from checkpoint after regaining the 
leadership. */
-public class JobDispatcherITCase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class JobDispatcherITCase {
 
     private static final Duration TIMEOUT = Duration.ofMinutes(10);
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
     private Supplier<DispatcherResourceManagerComponentFactory>
             createJobModeDispatcherResourceManagerComponentFactorySupplier(
                     Configuration configuration) {
@@ -92,7 +94,8 @@ public class JobDispatcherITCase extends TestLogger {
     }
 
     @Test
-    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {
+    public void 
testRecoverFromCheckpointAfterLosingAndRegainingLeadership(@TempDir Path 
tmpPath)
+            throws Exception {
         final Deadline deadline = Deadline.fromNow(TIMEOUT);
         final Configuration configuration = new Configuration();
         configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
@@ -103,9 +106,11 @@ public class JobDispatcherITCase extends TestLogger {
         final EmbeddedHaServicesWithLeadershipControl haServices =
                 new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
 
-        Configuration newConfiguration = new 
Configuration(clusterConfiguration.getConfiguration());
-        long checkpointInterval = 500;
-        JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);
+        final Configuration newConfiguration =
+                new Configuration(clusterConfiguration.getConfiguration());
+        final long checkpointInterval = 100;
+        final JobID jobID =
+                generateAndPersistJobGraph(newConfiguration, 
checkpointInterval, tmpPath);
 
         final TestingMiniCluster.Builder clusterBuilder =
                 TestingMiniCluster.newBuilder(clusterConfiguration)
@@ -113,18 +118,13 @@ public class JobDispatcherITCase extends TestLogger {
                         .setDispatcherResourceManagerComponentFactorySupplier(
                                 
createJobModeDispatcherResourceManagerComponentFactorySupplier(
                                         newConfiguration));
+        AtLeastOneCheckpointInvokable.reset();
 
         try (final MiniCluster cluster = clusterBuilder.build()) {
             // start mini cluster and submit the job
             cluster.start();
 
-            // wait until job is running
-            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
-            CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false);
-            CommonTestUtils.waitUntilCondition(
-                    () -> queryCompletedCheckpoints(cluster, jobID) > 0L,
-                    Deadline.fromNow(Duration.ofSeconds(30)),
-                    checkpointInterval / 2);
+            
AtLeastOneCheckpointInvokable.atLeastOneCheckpointCompleted.await();
 
             final CompletableFuture<JobResult> firstJobResult = 
cluster.requestJobResult(jobID);
             haServices.revokeDispatcherLeadership();
@@ -142,19 +142,13 @@ public class JobDispatcherITCase extends TestLogger {
                             .get()
                             .getCheckpointStatsSnapshot()
                             .getLatestRestoredCheckpoint());
-
-            cluster.cancelJob(jobID);
-
-            // the cluster should shut down automatically once the application 
completes
-            CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), 
deadline);
         }
     }
 
-    private JobID generateJobGraph(Configuration configuration, long 
checkpointInterval)
-            throws Exception {
+    private JobID generateAndPersistJobGraph(
+            Configuration configuration, long checkpointInterval, Path 
tmpPath) throws Exception {
         final JobVertex jobVertex = new JobVertex("jobVertex");
-        jobVertex.setInvokableClass(
-                
AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable.class);
+        jobVertex.setInvokableClass(AtLeastOneCheckpointInvokable.class);
         jobVertex.setParallelism(1);
 
         final CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration =
@@ -163,14 +157,13 @@ public class JobDispatcherITCase extends TestLogger {
                         .build();
         final JobCheckpointingSettings checkpointingSettings =
                 new 
JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
-        JobGraph jobGraph =
+        final JobGraph jobGraph =
                 JobGraphBuilder.newStreamingJobGraphBuilder()
                         .addJobVertex(jobVertex)
                         .setJobCheckpointingSettings(checkpointingSettings)
                         .build();
 
-        final Path jobGraphPath =
-                
TEMPORARY_FOLDER.newFile(JOB_GRAPH_FILE_PATH.defaultValue()).toPath();
+        final Path jobGraphPath = 
tmpPath.resolve(JOB_GRAPH_FILE_PATH.defaultValue());
         try (ObjectOutputStream objectOutputStream =
                 new ObjectOutputStream(Files.newOutputStream(jobGraphPath, 
CREATE))) {
             objectOutputStream.writeObject(jobGraph);
@@ -179,16 +172,6 @@ public class JobDispatcherITCase extends TestLogger {
         return jobGraph.getJobID();
     }
 
-    private long queryCompletedCheckpoints(MiniCluster miniCluster, JobID 
jobID)
-            throws InterruptedException, ExecutionException {
-        return miniCluster
-                .getArchivedExecutionGraph(jobID)
-                .get()
-                .getCheckpointStatsSnapshot()
-                .getCounts()
-                .getNumberOfCompletedCheckpoints();
-    }
-
     private static void awaitJobStatus(
             MiniCluster cluster, JobID jobId, JobStatus status, Deadline 
deadline)
             throws Exception {
@@ -207,4 +190,28 @@ public class JobDispatcherITCase extends TestLogger {
                 },
                 deadline);
     }
+
+    /**
+     * An invokable that supports checkpointing and counts down when there is 
at least one
+     * checkpoint.
+     */
+    public static class AtLeastOneCheckpointInvokable
+            extends AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable {
+
+        private static volatile CountDownLatch atLeastOneCheckpointCompleted;
+
+        private static void reset() {
+            atLeastOneCheckpointCompleted = new CountDownLatch(1);
+        }
+
+        public AtLeastOneCheckpointInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+            atLeastOneCheckpointCompleted.countDown();
+            return CompletableFuture.completedFuture(null);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 811f303..d85f20e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.util.TestLogger;
 
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -58,13 +59,11 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
 
 /** Tests for the {@link MiniDispatcher}. */
 public class MiniDispatcherTest extends TestLogger {
@@ -225,10 +224,8 @@ public class MiniDispatcherTest extends TestLogger {
                                     .setState(JobStatus.SUSPENDED)
                                     .build()));
 
-            miniDispatcher.getShutDownFuture().get(3, TimeUnit.SECONDS);
-            fail("The shutDownFuture should not be done.");
-        } catch (TimeoutException ignored) {
-
+            testingJobManagerRunner.getTerminationFuture().get();
+            
Assertions.assertThat(miniDispatcher.getShutDownFuture()).isNotDone();
         } finally {
             RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
         }

Reply via email to