Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5727#discussion_r176489375
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
---
@@ -105,90 +85,51 @@ public JobSubmissionFailsITCase(boolean detached) {
//
--------------------------------------------------------------------------------------------
- private JobExecutionResult submitJob(JobGraph jobGraph) throws
Exception {
- if (detached) {
- cluster.submitJobDetached(jobGraph);
- return null;
- }
- else {
- return cluster.submitJobAndWait(jobGraph, false,
TestingUtils.TESTING_DURATION());
- }
- }
-
@Test
- public void testExceptionInInitializeOnMaster() {
- try {
- final JobVertex failingJobVertex = new
FailingJobVertex("Failing job vertex");
- failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
- final JobGraph failingJobGraph = new JobGraph("Failing
testing job", failingJobVertex);
+ public void testExceptionInInitializeOnMaster() throws Exception {
+ final JobVertex failingJobVertex = new
FailingJobVertex("Failing job vertex");
+ failingJobVertex.setInvokableClass(NoOpInvokable.class);
- try {
- submitJob(failingJobGraph);
- fail("Expected JobExecutionException.");
- }
- catch (JobExecutionException e) {
- assertEquals("Test exception.",
e.getCause().getMessage());
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " +
t.getClass() + ".");
- }
+ final JobGraph failingJobGraph = new JobGraph("Failing testing
job", failingJobVertex);
- cluster.submitJobAndWait(workingJobGraph, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
+ ClusterClient<?> client =
MINI_CLUSTER_RESOURCE.getClusterClient();
+ client.setDetached(detached);
- @Test
- public void testSubmitEmptyJobGraph() {
try {
- final JobGraph jobGraph = new JobGraph("Testing job");
-
- try {
- submitJob(jobGraph);
- fail("Expected JobSubmissionException.");
- }
- catch (JobSubmissionException e) {
- assertTrue(e.getMessage() != null &&
e.getMessage().contains("empty"));
+ client.submitJob(failingJobGraph,
JobSubmissionFailsITCase.class.getClassLoader());
+ fail("Job submission should have thrown an exception.");
+ } catch (Exception e) {
+ Optional<Throwable> expectedCause =
ExceptionUtils.findThrowable(e,
+ candidate -> candidate.getMessage() != null &&
candidate.getMessage().equals("Test exception."));
+ if (!expectedCause.isPresent()) {
+ throw e;
}
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " +
t.getClass() + ".");
- }
-
- cluster.submitJobAndWait(workingJobGraph, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
+
+ client.setDetached(false);
+ client.submitJob(getWorkingJobGraph(),
JobSubmissionFailsITCase.class.getClassLoader());
--- End diff --
Alright, it's because of the `RunningJobsRegistry` which records that a
previous job with the same `JobID` has already been executed.
---