tillrohrmann commented on a change in pull request #17000: URL: https://github.com/apache/flink/pull/17000#discussion_r699438111
########## File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.application; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.testjar.BlockingJob; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +/** Integration tests related to {@link ApplicationDispatcherBootstrap}. */ +public class ApplicationDispatcherBootstrapITCase { Review comment: `extends TestLogger` is missing. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ########## @@ -451,12 +451,23 @@ private void setupDispatcherResourceManagerComponents( final CompletableFuture<ApplicationStatus> shutDownFuture = dispatcherResourceManagerComponent.getShutDownFuture(); FutureUtils.assertNoException( - shutDownFuture.thenRun(dispatcherResourceManagerComponent::closeAsync)); + shutDownFuture + .handle( + (applicationStatus, exception) -> { + if (exception != null) { + return dispatcherResourceManagerComponent + .stopApplication( + ApplicationStatus.UNKNOWN, + ExceptionUtils.stringifyException( + exception)); Review comment: I am not sure whether we shouldn't fail here instead of gracefully shutting down the `MiniCluster`. I thought that `shutDownFuture` will only be completed exceptionally if there is an unexpected exception. ########## File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java ########## @@ -386,59 +385,85 @@ public void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception { bootstrap.stop(); - // we call the error handler - assertException(errorHandlerFuture, CancellationException.class); + // we didn't call the error handler + assertFalse(errorHandlerFuture.isDone()); - // we return a future that is completed exceptionally - assertException(shutdownFuture, CancellationException.class); + // shutdown future gets completed normally + shutdownFuture.get(); + + // verify that we didn't shut down the cluster + assertFalse(shutdownCalled.get()); // verify that the application task is being cancelled assertThat(applicationExecutionFuture.isCancelled(), is(true)); + assertThat(applicationExecutionFuture.isDone(), is(true)); } @Test - public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws Exception { + public void testClusterIsShutdownCalledWhenSubmissionThrowsAnException() throws Exception { Review comment: Is the test name correct wrt to what it tests? It looks as if we assert that `shutdown` is not called and that the `errorHandlerFuture` is completed. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java ########## @@ -147,35 +147,41 @@ public void stop() { */ private CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync( final DispatcherGateway dispatcherGateway) { - return applicationCompletionFuture - .handle( - (ignored, t) -> { - if (t == null) { - LOG.info("Application completed SUCCESSFULLY"); - return dispatcherGateway.shutDownCluster( - ApplicationStatus.SUCCEEDED); - } - - final Optional<UnsuccessfulExecutionException> maybeException = - ExceptionUtils.findThrowable( - t, UnsuccessfulExecutionException.class); - if (maybeException.isPresent()) { - final ApplicationStatus applicationStatus = - maybeException.get().getStatus(); - if (applicationStatus == ApplicationStatus.CANCELED - || applicationStatus == ApplicationStatus.FAILED) { - LOG.info("Application {}: ", applicationStatus, t); - return dispatcherGateway.shutDownCluster(applicationStatus); - } - } - - LOG.warn("Application failed unexpectedly: ", t); - this.errorHandler.onFatalError( - new FlinkException("Application failed unexpectedly.", t)); - - return FutureUtils.<Acknowledge>completedExceptionally(t); - }) - .thenCompose(Function.identity()); + final CompletableFuture<Acknowledge> shutdownFuture = + applicationCompletionFuture + .handle( + (ignored, t) -> { + if (t == null) { + LOG.info("Application completed SUCCESSFULLY"); + return dispatcherGateway.shutDownCluster( + ApplicationStatus.SUCCEEDED); + } + final Optional<UnsuccessfulExecutionException> maybeException = + ExceptionUtils.findThrowable( + t, UnsuccessfulExecutionException.class); + if (maybeException.isPresent()) { + final ApplicationStatus applicationStatus = + maybeException.get().getStatus(); + if (applicationStatus == ApplicationStatus.CANCELED + || applicationStatus == ApplicationStatus.FAILED) { + LOG.info("Application {}: ", applicationStatus, t); + return dispatcherGateway.shutDownCluster( + applicationStatus); + } + } + + if (t instanceof CancellationException) { + LOG.warn( + "Application is cancelled because the executing dispatcher lost leadership."); Review comment: I think this is not clear at this point. Maybe it is better to say that the `ApplicationDispatcherBootstrap` is being stopped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org