tillrohrmann commented on a change in pull request #16464:
URL: https://github.com/apache/flink/pull/16464#discussion_r667801707
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateJobSubmissionException.java
##########
@@ -25,7 +25,19 @@
*/
public class DuplicateJobSubmissionException extends JobSubmissionException {
- public DuplicateJobSubmissionException(JobID jobID) {
+ private final boolean terminated;
+
+ public DuplicateJobSubmissionException(JobID jobID, boolean terminated) {
Review comment:
Personally, I do prefer enums over booleans because they allow to give
an expressive name to values. That way it is a bit easier what
`DuplicateJobSubmissionException(jobId, JOB_TERMINATED)` means.
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -650,6 +655,66 @@ public void
testClusterDoesNOTShutdownWhenApplicationStatusUknown() throws Excep
assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
}
+ @Test
+ public void testDuplicateJobSubmissionWithTerminatedJobId() throws
Throwable {
+ final JobID testJobID = new JobID(0, 2);
+ final Configuration configurationUnderTest = getConfiguration();
+ configurationUnderTest.set(
+ PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
+ configurationUnderTest.set(
+ HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ final TestingDispatcherGateway.Builder dispatcherBuilder =
+ new TestingDispatcherGateway.Builder()
+ .setSubmitFunction(
+ jobGraph -> {
+ final CompletableFuture<Acknowledge>
submit =
+ new CompletableFuture<>();
+ submit.completeExceptionally(
+ new
DuplicateJobSubmissionException(testJobID, true));
+ return submit;
+ })
+ .setRequestJobStatusFunction(
+ jobId ->
CompletableFuture.completedFuture(JobStatus.FINISHED))
+ .setRequestJobResultFunction(
+ jobId ->
+ CompletableFuture.completedFuture(
+
createSuccessfulJobResult(jobId)));
Review comment:
I am not sure whether this will work in all cases. I actually think that
Flink loses the job result in case of a `JobManager` process failover. Hence, I
would expect that the application mode will fail with a
`FlinkJobNotFoundException` once it tries to query the result for the already
finished job.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]