rmetzger commented on a change in pull request #15093:
URL: https://github.com/apache/flink/pull/15093#discussion_r588229663
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetrieverTest.java
##########
@@ -67,7 +68,11 @@ public static void init() throws IOException {
final JobVertex source = new JobVertex("source");
final JobVertex target = new JobVertex("target");
- final JobGraph jobGraph = new JobGraph(new JobID(), "test", source,
target);
+ final JobGraph jobGraph =
+ JobGraphBuilder.newStreamingJobGraphBuilder()
+ .addJobVertices(Arrays.asList(source, target))
+
.addClasspaths(Collections.singletonList(jarFileInJobGraph.toUri().toURL()))
+ .build();
jobGraph.setClasspaths(Collections.singletonList(jarFileInJobGraph.toUri().toURL()));
Review comment:
```suggestion
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -392,46 +418,47 @@ public void testNonBlockingJobSubmission() throws
Exception {
assertEquals(jobID,
multiDetails.getJobs().iterator().next().getJobId());
// submission has succeeded, let the initialization finish.
- blockingJobGraph.f1.unblock();
+ latch.trigger();
// ensure job is running
CommonTestUtils.waitUntilCondition(
- () ->
-
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get()
- == JobStatus.RUNNING,
+ () -> dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get()
== JobStatus.RUNNING,
Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)),
5L);
}
@Test(timeout = 5_000L)
public void testInvalidCallDuringInitialization() throws Exception {
+ final OneShotLatch latch = new OneShotLatch();
dispatcher =
createAndStartDispatcher(
heartbeatServices,
haServices,
- new ExpectedJobIdJobManagerRunnerFactory(
- TEST_JOB_ID, createdJobManagerRunnerLatch));
- jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ new BlockingJobManagerRunnerFactory(latch::await));
+
DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph =
getBlockingJobGraphAndVertex();
- JobID jid = blockingJobGraph.f0.getJobID();
+ final JobGraph emptyJobGraph =
+
JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(jobId).build();
- dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+ dispatcherGateway.submitJob(emptyJobGraph, TIMEOUT).get();
assertThat(
- dispatcherGateway.requestJobStatus(jid, TIMEOUT).get(),
is(JobStatus.INITIALIZING));
+ dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(),
+ is(JobStatus.INITIALIZING));
// this call is supposed to fail
try {
- dispatcherGateway.triggerSavepoint(jid, "file:///tmp/savepoint",
false, TIMEOUT).get();
+ dispatcherGateway
+ .triggerSavepoint(jobId, "file:///tmp/savepoint", false,
TIMEOUT)
+ .get();
fail("Previous statement should have failed");
} catch (ExecutionException t) {
assertTrue(t.getCause() instanceof
UnavailableDispatcherOperationException);
}
// submission has succeeded, let the initialization finish.
- blockingJobGraph.f1.unblock();
+ latch.trigger();
Review comment:
This is much nicer than the Tuple2 with the vertex! Thanks for the
cleanup.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -260,100 +245,6 @@ public static void teardownClass() {
}
}
- @Test
- public void testDeclineCheckpointInvocationWithUserException() throws
Exception {
Review comment:
found https://github.com/apache/flink/pull/14662/files#r585721398 ;)
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -991,13 +1027,11 @@ public TestingJobManagerRunner createJobManagerRunner(
final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
blockingJobVertex.setInvokableClass(NoOpInvokable.class);
return Tuple2.of(
- new JobGraph(TEST_JOB_ID, "blockingTestJob",
blockingJobVertex), blockingJobVertex);
- }
-
- private JobGraph createFailingJobGraph(Exception failureCause) {
- final FailingJobVertex jobVertex = new FailingJobVertex("Failing
JobVertex", failureCause);
- jobVertex.setInvokableClass(NoOpInvokable.class);
- return new JobGraph(jobGraph.getJobID(), "Failing JobGraph",
jobVertex);
+ JobGraphBuilder.newStreamingJobGraphBuilder()
+ .setJobId(jobId)
+ .addJobVertex(blockingJobVertex)
+ .build(),
+ blockingJobVertex);
}
private static class FailingJobVertex extends JobVertex {
Review comment:
Thanks a lot for the various cleanups! I took some notes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]