tillrohrmann commented on a change in pull request #15093:
URL: https://github.com/apache/flink/pull/15093#discussion_r588350992
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -392,46 +418,47 @@ public void testNonBlockingJobSubmission() throws
Exception {
assertEquals(jobID,
multiDetails.getJobs().iterator().next().getJobId());
// submission has succeeded, let the initialization finish.
- blockingJobGraph.f1.unblock();
+ latch.trigger();
// ensure job is running
CommonTestUtils.waitUntilCondition(
- () ->
-
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get()
- == JobStatus.RUNNING,
+ () -> dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get()
== JobStatus.RUNNING,
Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)),
5L);
}
@Test(timeout = 5_000L)
public void testInvalidCallDuringInitialization() throws Exception {
+ final OneShotLatch latch = new OneShotLatch();
dispatcher =
createAndStartDispatcher(
heartbeatServices,
haServices,
- new ExpectedJobIdJobManagerRunnerFactory(
- TEST_JOB_ID, createdJobManagerRunnerLatch));
- jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ new BlockingJobManagerRunnerFactory(latch::await));
+
DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph =
getBlockingJobGraphAndVertex();
- JobID jid = blockingJobGraph.f0.getJobID();
+ final JobGraph emptyJobGraph =
+
JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(jobId).build();
- dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+ dispatcherGateway.submitJob(emptyJobGraph, TIMEOUT).get();
assertThat(
- dispatcherGateway.requestJobStatus(jid, TIMEOUT).get(),
is(JobStatus.INITIALIZING));
+ dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
+ is(JobStatus.INITIALIZING));
// this call is supposed to fail
try {
- dispatcherGateway.triggerSavepoint(jid, "file:///tmp/savepoint",
false, TIMEOUT).get();
+ dispatcherGateway
+ .triggerSavepoint(jobId, "file:///tmp/savepoint", false,
TIMEOUT)
+ .get();
fail("Previous statement should have failed");
} catch (ExecutionException t) {
assertTrue(t.getCause() instanceof
UnavailableDispatcherOperationException);
}
// submission has succeeded, let the initialization finish.
- blockingJobGraph.f1.unblock();
+ latch.trigger();
Review comment:
Yeah, `Tuple2` is actually quite ugly. We should try to never use it if
possible.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]