dmvk commented on a change in pull request #16464:
URL: https://github.com/apache/flink/pull/16464#discussion_r668689540
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -254,37 +265,67 @@ private void runApplicationEntryPoint(
jobIdsFuture.complete(applicationJobIds);
}
} catch (Throwable t) {
- jobIdsFuture.completeExceptionally(
- new ApplicationExecutionException("Could not execute
application.", t));
+ // If we're running in a single job execution mode, it's safe to
consider re-submission
+ // of an already finished a success.
+ final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+ ExceptionUtils.findThrowable(t,
DuplicateJobSubmissionException.class);
+ if (enforceSingleJobExecution
+ && maybeDuplicate.isPresent()
+ && maybeDuplicate.get().isGloballyTerminated()) {
+ final JobID jobId = maybeDuplicate.get().getJobID();
+ tolerateMissingResult.add(jobId);
+ jobIdsFuture.complete(Collections.singletonList(jobId));
+ } else {
+ jobIdsFuture.completeExceptionally(
+ new ApplicationExecutionException("Could not execute
application.", t));
+ }
}
}
private CompletableFuture<Void> getApplicationResult(
final DispatcherGateway dispatcherGateway,
final Collection<JobID> applicationJobIds,
+ final Set<JobID> tolerateMissingResult,
final ScheduledExecutor executor) {
final List<CompletableFuture<?>> jobResultFutures =
applicationJobIds.stream()
.map(
jobId ->
unwrapJobResultException(
-
getJobResult(dispatcherGateway, jobId, executor)))
+ getJobResult(
+ dispatcherGateway,
+ jobId,
+ executor,
+
tolerateMissingResult.contains(jobId))))
.collect(Collectors.toList());
return FutureUtils.waitForAll(jobResultFutures);
}
private CompletableFuture<JobResult> getJobResult(
final DispatcherGateway dispatcherGateway,
final JobID jobId,
- final ScheduledExecutor scheduledExecutor) {
-
+ final ScheduledExecutor scheduledExecutor,
+ final boolean tolerateMissingResult) {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Time retryPeriod =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
-
- return JobStatusPollingUtils.getJobResult(
- dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
+ final CompletableFuture<JobResult> jobResultFuture =
+ JobStatusPollingUtils.getJobResult(
+ dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
+ if (tolerateMissingResult) {
+ // Return "unknown" job result if dispatcher no longer knows the
actual result.
+ return FutureUtils.handleException(
+ jobResultFuture,
+ FlinkJobNotFoundException.class,
+ exception ->
+ new JobResult.Builder()
+ .jobId(jobId)
+
.applicationStatus(ApplicationStatus.UNKNOWN)
Review comment:
Sum up from an offline discussion with Till. We can live this limitation
for now, until we solve
[FLINK-11813](https://issues.apache.org/jira/browse/FLINK-11813). We'll
document the limitation in the release notes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]