tillrohrmann commented on a change in pull request #16464:
URL: https://github.com/apache/flink/pull/16464#discussion_r667801707



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateJobSubmissionException.java
##########
@@ -25,7 +25,19 @@
  */
 public class DuplicateJobSubmissionException extends JobSubmissionException {
 
-    public DuplicateJobSubmissionException(JobID jobID) {
+    private final boolean terminated;
+
+    public DuplicateJobSubmissionException(JobID jobID, boolean terminated) {

Review comment:
       Personally, I do prefer enums over booleans because they allow to give 
an expressive name to values. That way it is a bit easier what 
`DuplicateJobSubmissionException(jobId, JOB_TERMINATED)` means.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -650,6 +655,66 @@ public void 
testClusterDoesNOTShutdownWhenApplicationStatusUknown() throws Excep
         assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
     }
 
+    @Test
+    public void testDuplicateJobSubmissionWithTerminatedJobId() throws 
Throwable {
+        final JobID testJobID = new JobID(0, 2);
+        final Configuration configurationUnderTest = getConfiguration();
+        configurationUnderTest.set(
+                PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
testJobID.toHexString());
+        configurationUnderTest.set(
+                HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingDispatcherGateway.Builder dispatcherBuilder =
+                new TestingDispatcherGateway.Builder()
+                        .setSubmitFunction(
+                                jobGraph -> {
+                                    final CompletableFuture<Acknowledge> 
submit =
+                                            new CompletableFuture<>();
+                                    submit.completeExceptionally(
+                                            new 
DuplicateJobSubmissionException(testJobID, true));
+                                    return submit;
+                                })
+                        .setRequestJobStatusFunction(
+                                jobId -> 
CompletableFuture.completedFuture(JobStatus.FINISHED))
+                        .setRequestJobResultFunction(
+                                jobId ->
+                                        CompletableFuture.completedFuture(
+                                                
createSuccessfulJobResult(jobId)));

Review comment:
       I am not sure whether this will work in all cases. I actually think that 
Flink loses the job result in case of a `JobManager` process failover. Hence, I 
would expect that the application mode will fail with a 
`FlinkJobNotFoundException` once it tries to query the result for the already 
finished job.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to