tillrohrmann commented on a change in pull request #16464:
URL: https://github.com/apache/flink/pull/16464#discussion_r668621316
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -650,6 +657,105 @@ public void
testClusterDoesNOTShutdownWhenApplicationStatusUknown() throws Excep
assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
}
+ @Test
+ public void testDuplicateJobSubmissionWithTerminatedJobId() throws
Throwable {
+ final JobID testJobID = new JobID(0, 2);
+ final Configuration configurationUnderTest = getConfiguration();
+ configurationUnderTest.set(
+ PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
+ configurationUnderTest.set(
+ HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ final TestingDispatcherGateway.Builder dispatcherBuilder =
+ new TestingDispatcherGateway.Builder()
+ .setSubmitFunction(
+ jobGraph -> {
+ final CompletableFuture<Acknowledge>
submit =
+ new CompletableFuture<>();
+ submit.completeExceptionally(
+
DuplicateJobSubmissionException.ofGloballyTerminated(
+ testJobID));
+ return submit;
+ })
+ .setRequestJobStatusFunction(
+ jobId ->
CompletableFuture.completedFuture(JobStatus.FINISHED))
+ .setRequestJobResultFunction(
+ jobId ->
+ CompletableFuture.completedFuture(
+
createSuccessfulJobResult(jobId)));
+ final CompletableFuture<Void> applicationFuture =
+ runApplication(dispatcherBuilder, configurationUnderTest, 1);
+ applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * In this scenario, job result is no longer present in the {@link
+ * org.apache.flink.runtime.dispatcher.Dispatcher dispatcher} (job has
terminated and job
+ * manager failed over), but we know that job has already terminated from
{@link
+ * org.apache.flink.runtime.highavailability.RunningJobsRegistry running
jobs registry}.
+ */
+ @Test
+ public void
testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() throws
Throwable {
+ final JobID testJobID = new JobID(0, 2);
+ final Configuration configurationUnderTest = getConfiguration();
+ configurationUnderTest.set(
+ PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
+ configurationUnderTest.set(
+ HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ final TestingDispatcherGateway.Builder dispatcherBuilder =
+ new TestingDispatcherGateway.Builder()
+ .setSubmitFunction(
+ jobGraph -> {
+ final CompletableFuture<Acknowledge>
submit =
+ new CompletableFuture<>();
+ submit.completeExceptionally(
+
DuplicateJobSubmissionException.ofGloballyTerminated(
+ testJobID));
+ return submit;
+ })
+ .setRequestJobStatusFunction(
+ jobId ->
+ FutureUtils.completedExceptionally(
+ new
FlinkJobNotFoundException(jobId)))
+ .setRequestJobResultFunction(
+ jobId ->
+ FutureUtils.completedExceptionally(
+ new
FlinkJobNotFoundException(jobId)));
+ final CompletableFuture<Void> applicationFuture =
+ runApplication(dispatcherBuilder, configurationUnderTest, 1);
+ applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
+ final JobID testJobID = new JobID(0, 2);
+ final Configuration configurationUnderTest = getConfiguration();
+ configurationUnderTest.set(
+ PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
testJobID.toHexString());
+ configurationUnderTest.set(
+ HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ final TestingDispatcherGateway.Builder dispatcherBuilder =
+ new TestingDispatcherGateway.Builder()
+ .setSubmitFunction(
+ jobGraph -> {
+ final CompletableFuture<Acknowledge>
submit =
+ new CompletableFuture<>();
+ submit.completeExceptionally(
+
DuplicateJobSubmissionException.of(testJobID));
+ return submit;
Review comment:
```suggestion
return
FutureUtils.completedExceptionally(
DuplicateJobSubmissionException.of(testJobID));
```
##########
File path:
flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
##########
@@ -1261,6 +1261,43 @@ public static void
assertNoException(CompletableFuture<?> completableFuture) {
handleUncaughtException(completableFuture,
FatalExitExceptionHandler.INSTANCE);
}
+ /**
+ * Checks that the given {@link CompletableFuture} is not completed
exceptionally with the
+ * specified class. If the future is completed exceptionally with the
specific class, then try
+ * to recover using a given exception handler. If the exception does not
match the specified
+ * class, just pass it through to later stages.
+ *
+ * @param completableFuture to assert for a given exception
+ * @param exceptionClass exception class to assert for
+ * @param exceptionHandler to call if the future is completed
exceptionally with the specific
+ * exception
+ * @return completable future, that can recover from a specified exception
+ */
+ public static <T, E extends Throwable> CompletableFuture<T>
handleException(
+ CompletableFuture<T> completableFuture,
+ Class<E> exceptionClass,
+ Function<E, T> exceptionHandler) {
Review comment:
For `T` it might not make a huge difference but for `E` it can make a
difference. That's why I would suggest to change the signature to
```
CompletableFuture<? extends T> completableFuture,
Class<E> exceptionClass,
Function<? super E, ? extends T> exceptionHandler
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -254,37 +265,67 @@ private void runApplicationEntryPoint(
jobIdsFuture.complete(applicationJobIds);
}
} catch (Throwable t) {
- jobIdsFuture.completeExceptionally(
- new ApplicationExecutionException("Could not execute
application.", t));
+ // If we're running in a single job execution mode, it's safe to
consider re-submission
+ // of an already finished a success.
+ final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+ ExceptionUtils.findThrowable(t,
DuplicateJobSubmissionException.class);
+ if (enforceSingleJobExecution
+ && maybeDuplicate.isPresent()
+ && maybeDuplicate.get().isGloballyTerminated()) {
+ final JobID jobId = maybeDuplicate.get().getJobID();
+ tolerateMissingResult.add(jobId);
+ jobIdsFuture.complete(Collections.singletonList(jobId));
+ } else {
+ jobIdsFuture.completeExceptionally(
+ new ApplicationExecutionException("Could not execute
application.", t));
+ }
}
}
private CompletableFuture<Void> getApplicationResult(
final DispatcherGateway dispatcherGateway,
final Collection<JobID> applicationJobIds,
+ final Set<JobID> tolerateMissingResult,
final ScheduledExecutor executor) {
final List<CompletableFuture<?>> jobResultFutures =
applicationJobIds.stream()
.map(
jobId ->
unwrapJobResultException(
-
getJobResult(dispatcherGateway, jobId, executor)))
+ getJobResult(
+ dispatcherGateway,
+ jobId,
+ executor,
+
tolerateMissingResult.contains(jobId))))
.collect(Collectors.toList());
return FutureUtils.waitForAll(jobResultFutures);
}
private CompletableFuture<JobResult> getJobResult(
final DispatcherGateway dispatcherGateway,
final JobID jobId,
- final ScheduledExecutor scheduledExecutor) {
-
+ final ScheduledExecutor scheduledExecutor,
+ final boolean tolerateMissingResult) {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Time retryPeriod =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
-
- return JobStatusPollingUtils.getJobResult(
- dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
+ final CompletableFuture<JobResult> jobResultFuture =
+ JobStatusPollingUtils.getJobResult(
+ dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
+ if (tolerateMissingResult) {
+ // Return "unknown" job result if dispatcher no longer knows the
actual result.
+ return FutureUtils.handleException(
+ jobResultFuture,
+ FlinkJobNotFoundException.class,
+ exception ->
+ new JobResult.Builder()
+ .jobId(jobId)
+
.applicationStatus(ApplicationStatus.UNKNOWN)
Review comment:
Also, we might wanna prevent that the user can call
`JobExecutionResult.getAccumulatorResult`. I would suggest that we fail if such
a call happens.
##########
File path:
flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
##########
@@ -1261,6 +1261,43 @@ public static void
assertNoException(CompletableFuture<?> completableFuture) {
handleUncaughtException(completableFuture,
FatalExitExceptionHandler.INSTANCE);
}
+ /**
+ * Checks that the given {@link CompletableFuture} is not completed
exceptionally with the
+ * specified class. If the future is completed exceptionally with the
specific class, then try
+ * to recover using a given exception handler. If the exception does not
match the specified
+ * class, just pass it through to later stages.
+ *
+ * @param completableFuture to assert for a given exception
+ * @param exceptionClass exception class to assert for
+ * @param exceptionHandler to call if the future is completed
exceptionally with the specific
+ * exception
+ * @return completable future, that can recover from a specified exception
+ */
+ public static <T, E extends Throwable> CompletableFuture<T>
handleException(
+ CompletableFuture<T> completableFuture,
+ Class<E> exceptionClass,
+ Function<E, T> exceptionHandler) {
+ final CompletableFuture<T> handledFuture = new CompletableFuture<>();
+ checkNotNull(completableFuture)
+ .whenComplete(
+ (result, throwable) -> {
+ if (throwable == null) {
+ handledFuture.complete(result);
+ } else if
(exceptionClass.isAssignableFrom(throwable.getClass())) {
+ @SuppressWarnings("unchecked")
+ final E exception = (E) throwable;
Review comment:
```suggestion
final E cast =
exceptionClass.cast(throwable);
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/client/DuplicateJobSubmissionException.java
##########
@@ -25,7 +25,27 @@
*/
public class DuplicateJobSubmissionException extends JobSubmissionException {
- public DuplicateJobSubmissionException(JobID jobID) {
+ public static DuplicateJobSubmissionException of(JobID jobId) {
+ return new DuplicateJobSubmissionException(jobId, false);
+ }
+
+ public static DuplicateJobSubmissionException ofGloballyTerminated(JobID
jobId) {
+ return new DuplicateJobSubmissionException(jobId, true);
+ }
Review comment:
I like this solution :-)
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
##########
@@ -824,6 +825,63 @@ private void testFutureContinuation(
Assert.assertTrue(continuationFuture.isDone());
}
+ @Test
+ public void testHandleExceptionWithCompletedFuture() {
+ final CompletableFuture<String> future =
CompletableFuture.completedFuture("foobar");
+ final CompletableFuture<String> handled =
+ FutureUtils.handleException(future, Exception.class, exception
-> "handled");
+ assertEquals("foobar", handled.join());
+ }
+
+ @Test
+ public void testHandleExceptionWithNormalCompletion() {
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ final CompletableFuture<String> handled =
+ FutureUtils.handleException(future, Exception.class, exception
-> "handled");
+ future.complete("foobar");
+ assertEquals("foobar", handled.join());
+ }
+
+ @Test
+ public void testHandleExceptionWithMatchingExceptionallyCompletedFuture() {
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ final CompletableFuture<String> handled =
+ FutureUtils.handleException(
+ future, UnsupportedOperationException.class, exception
-> "handled");
+ future.completeExceptionally(new
UnsupportedOperationException("foobar"));
+ assertEquals("handled", handled.join());
+ }
+
+ @Test
+ public void
testHandleExceptionWithNotMatchingExceptionallyCompletedFuture() {
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ final CompletableFuture<String> handled =
+ FutureUtils.handleException(
+ future, UnsupportedOperationException.class, exception
-> "handled");
+ future.completeExceptionally(new IllegalArgumentException("foobar"));
+ final CompletionException completionException =
+ assertThrows(CompletionException.class, handled::join);
+ assertTrue(completionException.getCause() instanceof
IllegalArgumentException);
+ assertEquals("foobar", completionException.getCause().getMessage());
Review comment:
Might be simplified to `assertEquals(completionException.getCause(),
failureCause)`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
##########
@@ -824,6 +825,63 @@ private void testFutureContinuation(
Assert.assertTrue(continuationFuture.isDone());
}
+ @Test
+ public void testHandleExceptionWithCompletedFuture() {
+ final CompletableFuture<String> future =
CompletableFuture.completedFuture("foobar");
+ final CompletableFuture<String> handled =
+ FutureUtils.handleException(future, Exception.class, exception
-> "handled");
+ assertEquals("foobar", handled.join());
+ }
+
+ @Test
+ public void testHandleExceptionWithNormalCompletion() {
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ final CompletableFuture<String> handled =
+ FutureUtils.handleException(future, Exception.class, exception
-> "handled");
+ future.complete("foobar");
+ assertEquals("foobar", handled.join());
+ }
+
+ @Test
+ public void testHandleExceptionWithMatchingExceptionallyCompletedFuture() {
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ final CompletableFuture<String> handled =
+ FutureUtils.handleException(
+ future, UnsupportedOperationException.class, exception
-> "handled");
+ future.completeExceptionally(new
UnsupportedOperationException("foobar"));
+ assertEquals("handled", handled.join());
+ }
+
+ @Test
+ public void
testHandleExceptionWithNotMatchingExceptionallyCompletedFuture() {
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ final CompletableFuture<String> handled =
+ FutureUtils.handleException(
+ future, UnsupportedOperationException.class, exception
-> "handled");
+ future.completeExceptionally(new IllegalArgumentException("foobar"));
+ final CompletionException completionException =
+ assertThrows(CompletionException.class, handled::join);
+ assertTrue(completionException.getCause() instanceof
IllegalArgumentException);
+ assertEquals("foobar", completionException.getCause().getMessage());
Review comment:
Alternatively, we could think about adding a `Matcher` to
`FlinkMatchers` that checks the type and message of an exception.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -254,37 +265,67 @@ private void runApplicationEntryPoint(
jobIdsFuture.complete(applicationJobIds);
}
} catch (Throwable t) {
- jobIdsFuture.completeExceptionally(
- new ApplicationExecutionException("Could not execute
application.", t));
+ // If we're running in a single job execution mode, it's safe to
consider re-submission
+ // of an already finished a success.
+ final Optional<DuplicateJobSubmissionException> maybeDuplicate =
+ ExceptionUtils.findThrowable(t,
DuplicateJobSubmissionException.class);
+ if (enforceSingleJobExecution
+ && maybeDuplicate.isPresent()
+ && maybeDuplicate.get().isGloballyTerminated()) {
+ final JobID jobId = maybeDuplicate.get().getJobID();
+ tolerateMissingResult.add(jobId);
+ jobIdsFuture.complete(Collections.singletonList(jobId));
+ } else {
+ jobIdsFuture.completeExceptionally(
+ new ApplicationExecutionException("Could not execute
application.", t));
+ }
}
}
private CompletableFuture<Void> getApplicationResult(
final DispatcherGateway dispatcherGateway,
final Collection<JobID> applicationJobIds,
+ final Set<JobID> tolerateMissingResult,
final ScheduledExecutor executor) {
final List<CompletableFuture<?>> jobResultFutures =
applicationJobIds.stream()
.map(
jobId ->
unwrapJobResultException(
-
getJobResult(dispatcherGateway, jobId, executor)))
+ getJobResult(
+ dispatcherGateway,
+ jobId,
+ executor,
+
tolerateMissingResult.contains(jobId))))
.collect(Collectors.toList());
return FutureUtils.waitForAll(jobResultFutures);
}
private CompletableFuture<JobResult> getJobResult(
final DispatcherGateway dispatcherGateway,
final JobID jobId,
- final ScheduledExecutor scheduledExecutor) {
-
+ final ScheduledExecutor scheduledExecutor,
+ final boolean tolerateMissingResult) {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Time retryPeriod =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
-
- return JobStatusPollingUtils.getJobResult(
- dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
+ final CompletableFuture<JobResult> jobResultFuture =
+ JobStatusPollingUtils.getJobResult(
+ dispatcherGateway, jobId, scheduledExecutor, timeout,
retryPeriod);
+ if (tolerateMissingResult) {
+ // Return "unknown" job result if dispatcher no longer knows the
actual result.
+ return FutureUtils.handleException(
+ jobResultFuture,
+ FlinkJobNotFoundException.class,
+ exception ->
+ new JobResult.Builder()
+ .jobId(jobId)
+
.applicationStatus(ApplicationStatus.UNKNOWN)
Review comment:
I think an `ApplicationStatus.UNKNOWN` will result in a
`JobExecutionException` when calling `JobResult.toJobExecutionResult`. I think
this will fail the client when calling
`StreamingExecutionEnvironment.execute()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]