This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 57ada0c  [FLINK-21116][tests] Harden 
DefaultDispatcherRunnerITCase#leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader.
57ada0c is described below

commit 57ada0cf4e9a4d00facd8c528c53a39c760d026c
Author: David Moravek <[email protected]>
AuthorDate: Tue Aug 3 22:33:30 2021 +0200

    [FLINK-21116][tests] Harden 
DefaultDispatcherRunnerITCase#leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader.
    
    This closes #16695.
---
 .../dispatcher/TestingJobManagerRunnerFactory.java  | 21 ++++++++-------------
 .../runner/DefaultDispatcherRunnerITCase.java       | 16 ++++++++++++++++
 2 files changed, 24 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 7b195f0..aea1651 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -27,11 +27,13 @@ import 
org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
 import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Testing implementation of {@link JobManagerRunnerFactory} which returns a 
{@link
@@ -42,14 +44,14 @@ public class TestingJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
     private final BlockingQueue<TestingJobManagerRunner> 
createdJobManagerRunner =
             new ArrayBlockingQueue<>(16);
 
-    private int numBlockingJobManagerRunners;
+    private final AtomicInteger numBlockingJobManagerRunners;
 
     public TestingJobManagerRunnerFactory() {
         this(0);
     }
 
     public TestingJobManagerRunnerFactory(int numBlockingJobManagerRunners) {
-        this.numBlockingJobManagerRunners = numBlockingJobManagerRunners;
+        this.numBlockingJobManagerRunners = new 
AtomicInteger(numBlockingJobManagerRunners);
     }
 
     @Override
@@ -66,22 +68,15 @@ public class TestingJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
             throws Exception {
         final TestingJobManagerRunner testingJobManagerRunner =
                 createTestingJobManagerRunner(jobGraph);
-        createdJobManagerRunner.offer(testingJobManagerRunner);
-
+        Preconditions.checkState(
+                createdJobManagerRunner.offer(testingJobManagerRunner),
+                "Unable to persist created the new runner.");
         return testingJobManagerRunner;
     }
 
     @Nonnull
     private TestingJobManagerRunner createTestingJobManagerRunner(JobGraph 
jobGraph) {
-        final boolean blockingTermination;
-
-        if (numBlockingJobManagerRunners > 0) {
-            numBlockingJobManagerRunners--;
-            blockingTermination = true;
-        } else {
-            blockingTermination = false;
-        }
-
+        final boolean blockingTermination = 
numBlockingJobManagerRunners.getAndDecrement() > 0;
         return new TestingJobManagerRunner.Builder()
                 .setJobId(jobGraph.getJobID())
                 .setBlockingTermination(blockingTermination)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index a3e2878..67c3f90 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -53,6 +53,8 @@ import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -69,6 +71,7 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /** Integration tests for the {@link DefaultDispatcherRunner}. */
@@ -207,6 +210,19 @@ public class DefaultDispatcherRunnerITCase extends 
TestLogger {
             assertThat(
                     leaderFuture.get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS),
                     is(equalTo(leaderSessionId)));
+
+            // Wait for job to recover...
+            final DispatcherGateway leaderGateway =
+                    rpcServiceResource
+                            .getTestingRpcService()
+                            .connect(
+                                    
dispatcherLeaderElectionService.getAddress(),
+                                    DispatcherId.fromUuid(leaderSessionId),
+                                    DispatcherGateway.class)
+                            .get();
+            assertEquals(
+                    jobGraph.getJobID(),
+                    
Iterables.getOnlyElement(leaderGateway.listJobs(TIMEOUT).get()));
         }
     }
 

Reply via email to