This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 57ada0c [FLINK-21116][tests] Harden
DefaultDispatcherRunnerITCase#leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader.
57ada0c is described below
commit 57ada0cf4e9a4d00facd8c528c53a39c760d026c
Author: David Moravek <[email protected]>
AuthorDate: Tue Aug 3 22:33:30 2021 +0200
[FLINK-21116][tests] Harden
DefaultDispatcherRunnerITCase#leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader.
This closes #16695.
---
.../dispatcher/TestingJobManagerRunnerFactory.java | 21 ++++++++-------------
.../runner/DefaultDispatcherRunnerITCase.java | 16 ++++++++++++++++
2 files changed, 24 insertions(+), 13 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 7b195f0..aea1651 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -27,11 +27,13 @@ import
org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Testing implementation of {@link JobManagerRunnerFactory} which returns a
{@link
@@ -42,14 +44,14 @@ public class TestingJobManagerRunnerFactory implements
JobManagerRunnerFactory {
private final BlockingQueue<TestingJobManagerRunner>
createdJobManagerRunner =
new ArrayBlockingQueue<>(16);
- private int numBlockingJobManagerRunners;
+ private final AtomicInteger numBlockingJobManagerRunners;
public TestingJobManagerRunnerFactory() {
this(0);
}
public TestingJobManagerRunnerFactory(int numBlockingJobManagerRunners) {
- this.numBlockingJobManagerRunners = numBlockingJobManagerRunners;
+ this.numBlockingJobManagerRunners = new
AtomicInteger(numBlockingJobManagerRunners);
}
@Override
@@ -66,22 +68,15 @@ public class TestingJobManagerRunnerFactory implements
JobManagerRunnerFactory {
throws Exception {
final TestingJobManagerRunner testingJobManagerRunner =
createTestingJobManagerRunner(jobGraph);
- createdJobManagerRunner.offer(testingJobManagerRunner);
-
+ Preconditions.checkState(
+ createdJobManagerRunner.offer(testingJobManagerRunner),
+ "Unable to persist created the new runner.");
return testingJobManagerRunner;
}
@Nonnull
private TestingJobManagerRunner createTestingJobManagerRunner(JobGraph
jobGraph) {
- final boolean blockingTermination;
-
- if (numBlockingJobManagerRunners > 0) {
- numBlockingJobManagerRunners--;
- blockingTermination = true;
- } else {
- blockingTermination = false;
- }
-
+ final boolean blockingTermination =
numBlockingJobManagerRunners.getAndDecrement() > 0;
return new TestingJobManagerRunner.Builder()
.setJobId(jobGraph.getJobID())
.setBlockingTermination(blockingTermination)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
index a3e2878..67c3f90 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
@@ -53,6 +53,8 @@ import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@@ -69,6 +71,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/** Integration tests for the {@link DefaultDispatcherRunner}. */
@@ -207,6 +210,19 @@ public class DefaultDispatcherRunnerITCase extends
TestLogger {
assertThat(
leaderFuture.get(TIMEOUT.toMilliseconds(),
TimeUnit.MILLISECONDS),
is(equalTo(leaderSessionId)));
+
+ // Wait for job to recover...
+ final DispatcherGateway leaderGateway =
+ rpcServiceResource
+ .getTestingRpcService()
+ .connect(
+
dispatcherLeaderElectionService.getAddress(),
+ DispatcherId.fromUuid(leaderSessionId),
+ DispatcherGateway.class)
+ .get();
+ assertEquals(
+ jobGraph.getJobID(),
+
Iterables.getOnlyElement(leaderGateway.listJobs(TIMEOUT).get()));
}
}