tillrohrmann commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r510912135
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -124,62 +125,64 @@ public void stop() {
return applicationExecutionTask;
}
- /**
- * Runs the user program entrypoint using {@link
#runApplicationAsync(DispatcherGateway,
- * ScheduledExecutor, boolean)} and shuts down the given dispatcher
when the application
- * completes (either successfully or in case of failure).
- */
@VisibleForTesting
- CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
- final DispatcherGateway dispatcher,
- final ScheduledExecutor scheduledExecutor) {
+ CompletableFuture<Void> getApplicationCompletionFuture() {
+ return applicationCompletionFuture;
+ }
- applicationCompletionFuture =
fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);
+ @VisibleForTesting
+ CompletableFuture<Acknowledge> getClusterShutdownFuture() {
+ return clusterShutdownFuture;
+ }
+ /**
+ * Runs the user program entrypoint and shuts down the given
dispatcherGateway when
+ * the application completes (either successfully or in case of
failure).
+ */
+ private CompletableFuture<Acknowledge>
runApplicationAndShutdownClusterAsync(final DispatcherGateway
dispatcherGateway) {
return applicationCompletionFuture
.handle((r, t) -> {
- final ApplicationStatus
applicationStatus;
- if (t != null) {
-
- final
Optional<JobCancellationException> cancellationException =
-
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
- if
(cancellationException.isPresent()) {
- // this means the Flink
Job was cancelled
- applicationStatus =
ApplicationStatus.CANCELED;
- } else if (t instanceof
CancellationException) {
- // this means that the
future was cancelled
- applicationStatus =
ApplicationStatus.UNKNOWN;
- } else {
- applicationStatus =
ApplicationStatus.FAILED;
- }
- LOG.warn("Application {}: ",
applicationStatus, t);
- } else {
- applicationStatus =
ApplicationStatus.SUCCEEDED;
+ if (t == null) {
LOG.info("Application completed
SUCCESSFULLY");
+ return
dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+ }
+
+ final
Optional<ApplicationFailureException> exception =
+
ExceptionUtils.findThrowable(t, ApplicationFailureException.class);
+
+ if (exception.isPresent()) {
+ final ApplicationStatus
applicationStatus = exception.get().getStatus();
+
+ if (applicationStatus ==
ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) {
+ LOG.info("Application
{}: ", applicationStatus, t);
+ return
dispatcherGateway.shutDownCluster(applicationStatus);
+ }
}
- return
dispatcher.shutDownCluster(applicationStatus);
+
+ LOG.warn("Exiting with Application
Status UNKNOWN: ", t);
Review comment:
Maybe
```suggestion
LOG.warn("Application failed
unexpectedly: ", t);
```
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -301,69 +306,132 @@ public void
testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
.setSubmitFunction(jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(jobId ->
CompletableFuture.completedFuture(JobStatus.RUNNING));
- ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(3);
+ // we're "listening" on this to be completed to verify that the
error handler is called.
+ // In production, this will shut down the cluster with an
exception.
+ final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
+ final ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(
+ 3, dispatcherBuilder.build(),
scheduledExecutor, errorHandlerFuture::completeExceptionally);
- final CompletableFuture<Acknowledge> shutdownFuture =
-
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(),
scheduledExecutor);
+ final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.getClusterShutdownFuture();
ScheduledFuture<?> applicationExecutionFuture =
bootstrap.getApplicationExecutionFuture();
bootstrap.stop();
- // wait until the bootstrap "thinks" it's done
- shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ // we call the error handler
+ assertException(errorHandlerFuture,
CancellationException.class);
+
+ // we return a future that is completed exceptionally
+ assertException(shutdownFuture, CancellationException.class);
// verify that the application task is being cancelled
assertThat(applicationExecutionFuture.isCancelled(), is(true));
}
@Test
- public void testClusterShutdownWhenStoppingBootstrap() throws Exception
{
+ public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws
Exception {
+ final TestingDispatcherGateway.Builder dispatcherBuilder = new
TestingDispatcherGateway.Builder()
+ .setSubmitFunction(jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()))
+ .setRequestJobStatusFunction(jobId ->
CompletableFuture.completedFuture(JobStatus.RUNNING));
+
+ // we're "listening" on this to be completed to verify that the
error handler is called.
+ // In production, this will shut down the cluster with an
exception.
+ final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
+ final ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(
+ 2, dispatcherBuilder.build(),
scheduledExecutor, errorHandlerFuture::completeExceptionally);
+
+ final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.getClusterShutdownFuture();
+
+ bootstrap.stop();
+
+ // we call the error handler
+ assertException(errorHandlerFuture,
CancellationException.class);
+
+ // we return a future that is completed exceptionally
+ assertException(shutdownFuture, CancellationException.class);
+ }
+
+ @Test
+ public void testErrorHandlerIsCalledWhenSubmissionFails() throws
Exception {
+ final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
+ final TestingDispatcherGateway.Builder dispatcherBuilder = new
TestingDispatcherGateway.Builder()
+ .setSubmitFunction(jobGraph -> {
+ throw new
FlinkRuntimeException("Nope!");
+ })
+ .setClusterShutdownFunction(status -> {
+ if (clusterShutdown.f0) {
+ throw new
FlinkRuntimeException("This should be called only once");
+ }
+ clusterShutdown.f0 = true;
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ });
+
+ // we're "listening" on this to be completed to verify that the
error handler is called.
+ // In production, this will shut down the cluster with an
exception.
+ final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
+ final TestingDispatcherGateway dispatcherGateway =
dispatcherBuilder.build();
+ final ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(
+ 3, dispatcherGateway, scheduledExecutor,
errorHandlerFuture::completeExceptionally);
+
+ final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.getClusterShutdownFuture();
+
+ // we call the error handler
+ assertException(errorHandlerFuture,
ApplicationExecutionException.class);
+
+ // we return a future that is completed exceptionally
+ assertException(shutdownFuture,
ApplicationExecutionException.class);
+
+ assertFalse(clusterShutdown.f0);
+ dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+ assertTrue(clusterShutdown.f0);
Review comment:
Why is this necessary? Couldn't we write the test w/o it and then
directly fail if `shutDownCluster` is being called?
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -373,57 +441,61 @@ public void testClusterShutdownWhenSubmissionFails()
throws Exception {
}
@Test
- public void testClusterShutdownWhenApplicationSucceeds() throws
Exception {
+ public void testClusterShutdownWhenApplicationGetsCancelled() throws
Exception {
// we're "listening" on this to be completed to verify that the
cluster
// is being shut down from the ApplicationDispatcherBootstrap
final CompletableFuture<ApplicationStatus>
externalShutdownFuture = new CompletableFuture<>();
final TestingDispatcherGateway.Builder dispatcherBuilder = new
TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()))
- .setRequestJobStatusFunction(jobId ->
CompletableFuture.completedFuture(JobStatus.FINISHED))
- .setRequestJobResultFunction(jobId ->
CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)))
+ .setRequestJobStatusFunction(jobId ->
CompletableFuture.completedFuture(JobStatus.CANCELED))
+ .setRequestJobResultFunction(jobId ->
CompletableFuture.completedFuture(createCancelledJobResult(jobId)))
.setClusterShutdownFunction((status) -> {
externalShutdownFuture.complete(status);
return
CompletableFuture.completedFuture(Acknowledge.get());
});
- ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(3);
+ ApplicationDispatcherBootstrap bootstrap =
+ createApplicationDispatcherBootstrap(3,
dispatcherBuilder.build(), scheduledExecutor);
- final CompletableFuture<Acknowledge> shutdownFuture =
-
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(),
scheduledExecutor);
+ final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.getClusterShutdownFuture();
// wait until the bootstrap "thinks" it's done
shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
// verify that the dispatcher is actually being shut down
- assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS), is(ApplicationStatus.SUCCEEDED));
+ assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS), is(ApplicationStatus.CANCELED));
}
@Test
- public void testClusterShutdownWhenApplicationFails() throws Exception {
+ public void testClusterDoesNOTShutdownWhenApplicationStatusUknown()
throws Exception {
// we're "listening" on this to be completed to verify that the
cluster
// is being shut down from the ApplicationDispatcherBootstrap
- final CompletableFuture<ApplicationStatus>
externalShutdownFuture = new CompletableFuture<>();
-
+ final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
final TestingDispatcherGateway.Builder dispatcherBuilder = new
TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(jobId ->
CompletableFuture.completedFuture(JobStatus.FAILED))
- .setRequestJobResultFunction(jobId ->
CompletableFuture.completedFuture(createFailedJobResult(jobId)))
- .setClusterShutdownFunction((status) -> {
- externalShutdownFuture.complete(status);
+ .setRequestJobResultFunction(jobId ->
CompletableFuture.completedFuture(createUnknownJobResult(jobId)))
+ .setClusterShutdownFunction(status -> {
+ if (clusterShutdown.f0) {
+ throw new
FlinkRuntimeException("This should be called only once");
+ }
+ clusterShutdown.f0 = true;
return
CompletableFuture.completedFuture(Acknowledge.get());
});
- ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(3);
+ final TestingDispatcherGateway dispatcherGateway =
dispatcherBuilder.build();
+ final ApplicationDispatcherBootstrap bootstrap =
+ createApplicationDispatcherBootstrap(3,
dispatcherGateway, scheduledExecutor);
- final CompletableFuture<Acknowledge> shutdownFuture =
-
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(),
scheduledExecutor);
+ final CompletableFuture<Acknowledge> applicationFuture =
bootstrap.getClusterShutdownFuture();
- // wait until the bootstrap "thinks" it's done
- shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ final ApplicationFailureException exception =
assertException(applicationFuture, ApplicationFailureException.class);
+ assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
- // verify that the dispatcher is actually being shut down
- assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS), is(ApplicationStatus.FAILED));
+ assertFalse(clusterShutdown.f0);
+ dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+ assertTrue(clusterShutdown.f0);
Review comment:
Same here, why do we need this?
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -301,69 +306,132 @@ public void
testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
.setSubmitFunction(jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(jobId ->
CompletableFuture.completedFuture(JobStatus.RUNNING));
- ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(3);
+ // we're "listening" on this to be completed to verify that the
error handler is called.
+ // In production, this will shut down the cluster with an
exception.
+ final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
+ final ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(
+ 3, dispatcherBuilder.build(),
scheduledExecutor, errorHandlerFuture::completeExceptionally);
- final CompletableFuture<Acknowledge> shutdownFuture =
-
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(),
scheduledExecutor);
+ final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.getClusterShutdownFuture();
ScheduledFuture<?> applicationExecutionFuture =
bootstrap.getApplicationExecutionFuture();
bootstrap.stop();
- // wait until the bootstrap "thinks" it's done
- shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ // we call the error handler
+ assertException(errorHandlerFuture,
CancellationException.class);
+
+ // we return a future that is completed exceptionally
+ assertException(shutdownFuture, CancellationException.class);
// verify that the application task is being cancelled
assertThat(applicationExecutionFuture.isCancelled(), is(true));
}
@Test
- public void testClusterShutdownWhenStoppingBootstrap() throws Exception
{
+ public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws
Exception {
+ final TestingDispatcherGateway.Builder dispatcherBuilder = new
TestingDispatcherGateway.Builder()
+ .setSubmitFunction(jobGraph ->
CompletableFuture.completedFuture(Acknowledge.get()))
+ .setRequestJobStatusFunction(jobId ->
CompletableFuture.completedFuture(JobStatus.RUNNING));
+
+ // we're "listening" on this to be completed to verify that the
error handler is called.
+ // In production, this will shut down the cluster with an
exception.
+ final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
+ final ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(
+ 2, dispatcherBuilder.build(),
scheduledExecutor, errorHandlerFuture::completeExceptionally);
+
+ final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.getClusterShutdownFuture();
+
+ bootstrap.stop();
+
+ // we call the error handler
+ assertException(errorHandlerFuture,
CancellationException.class);
+
+ // we return a future that is completed exceptionally
+ assertException(shutdownFuture, CancellationException.class);
+ }
+
+ @Test
+ public void testErrorHandlerIsCalledWhenSubmissionFails() throws
Exception {
+ final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
+ final TestingDispatcherGateway.Builder dispatcherBuilder = new
TestingDispatcherGateway.Builder()
+ .setSubmitFunction(jobGraph -> {
+ throw new
FlinkRuntimeException("Nope!");
+ })
+ .setClusterShutdownFunction(status -> {
+ if (clusterShutdown.f0) {
+ throw new
FlinkRuntimeException("This should be called only once");
+ }
+ clusterShutdown.f0 = true;
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ });
+
+ // we're "listening" on this to be completed to verify that the
error handler is called.
+ // In production, this will shut down the cluster with an
exception.
+ final CompletableFuture<Void> errorHandlerFuture = new
CompletableFuture<>();
+ final TestingDispatcherGateway dispatcherGateway =
dispatcherBuilder.build();
+ final ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(
+ 3, dispatcherGateway, scheduledExecutor,
errorHandlerFuture::completeExceptionally);
+
+ final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.getClusterShutdownFuture();
+
+ // we call the error handler
+ assertException(errorHandlerFuture,
ApplicationExecutionException.class);
+
+ // we return a future that is completed exceptionally
+ assertException(shutdownFuture,
ApplicationExecutionException.class);
+
+ assertFalse(clusterShutdown.f0);
+ dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+ assertTrue(clusterShutdown.f0);
Review comment:
Or we simply add a counter to `setClusterShutdownFunction` and assert
that its value `0` at the end of the test?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]