dmvk commented on a change in pull request #16464:
URL: https://github.com/apache/flink/pull/16464#discussion_r668652644
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -254,37 +265,67 @@ private void runApplicationEntryPoint(
jobIdsFuture.complete(applicationJobIds);
}
} catch (Throwable t) {
- jobIdsFuture.completeExceptionally(
- new ApplicationExecutionException("Could not execute
application.", t));
+ // If we're running in a single job execution mode, it's safe to
consider re-submission
+ // of an already finished a success.
+ final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+ ExceptionUtils.findThrowable(t,
DuplicateJobSubmissionException.class);
+ if (enforceSingleJobExecution
+ && maybeDuplicate.isPresent()
+ && maybeDuplicate.get().isGloballyTerminated()) {
+ final JobID jobId = maybeDuplicate.get().getJobID();
+ tolerateMissingResult.add(jobId);
+ jobIdsFuture.complete(Collections.singletonList(jobId));
+ } else {
+ jobIdsFuture.completeExceptionally(
+ new ApplicationExecutionException("Could not execute
application.", t));
+ }
}
}
private CompletableFuture<Void> getApplicationResult(
final DispatcherGateway dispatcherGateway,
final Collection<JobID> applicationJobIds,
+ final Set<JobID> tolerateMissingResult,
final ScheduledExecutor executor) {
final List<CompletableFuture<?>> jobResultFutures =
applicationJobIds.stream()
.map(
jobId ->
unwrapJobResultException(
-
getJobResult(dispatcherGateway, jobId, executor)))
+ getJobResult(
+ dispatcherGateway,
+ jobId,
+ executor,
+
tolerateMissingResult.contains(jobId))))
.collect(Collectors.toList());
return FutureUtils.waitForAll(jobResultFutures);
}
private CompletableFuture<JobResult> getJobResult(
final DispatcherGateway dispatcherGateway,
final JobID jobId,
- final ScheduledExecutor scheduledExecutor) {
-
+ final ScheduledExecutor scheduledExecutor,
+ final boolean tolerateMissingResult) {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Time retryPeriod =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
-
- return JobStatusPollingUtils.getJobResult(
- dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
+ final CompletableFuture<JobResult> jobResultFuture =
+ JobStatusPollingUtils.getJobResult(
+ dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
+ if (tolerateMissingResult) {
+ // Return "unknown" job result if dispatcher no longer knows the
actual result.
+ return FutureUtils.handleException(
+ jobResultFuture,
+ FlinkJobNotFoundException.class,
+ exception ->
+ new JobResult.Builder()
+ .jobId(jobId)
+
.applicationStatus(ApplicationStatus.UNKNOWN)
Review comment:
Hmm, this is tricky. Right now I think we're just failing call to
`execute()` with DuplicateJobSubmission, so there should be no more calls from
client code. We're basically just ignoring that exception in
ApplicationDispatcherBoostrap so cluster can properly cleanup.
Letting user to intetract with the result would probably require some
changes to EmbeddedJobClient 🤔 I need to give a closer look, if there is any
way we can move the duplication handling there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]