tillrohrmann commented on a change in pull request #17000:
URL: https://github.com/apache/flink/pull/17000#discussion_r699438111



##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.testjar.BlockingJob;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+/** Integration tests related to {@link ApplicationDispatcherBootstrap}. */
+public class ApplicationDispatcherBootstrapITCase {

Review comment:
       `extends TestLogger` is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -451,12 +451,23 @@ private void setupDispatcherResourceManagerComponents(
             final CompletableFuture<ApplicationStatus> shutDownFuture =
                     dispatcherResourceManagerComponent.getShutDownFuture();
             FutureUtils.assertNoException(
-                    
shutDownFuture.thenRun(dispatcherResourceManagerComponent::closeAsync));
+                    shutDownFuture
+                            .handle(
+                                    (applicationStatus, exception) -> {
+                                        if (exception != null) {
+                                            return 
dispatcherResourceManagerComponent
+                                                    .stopApplication(
+                                                            
ApplicationStatus.UNKNOWN,
+                                                            
ExceptionUtils.stringifyException(
+                                                                    
exception));

Review comment:
       I am not sure whether we shouldn't fail here instead of gracefully 
shutting down the `MiniCluster`. I thought that `shutDownFuture` will only be 
completed exceptionally if there is an unexpected exception.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -386,59 +385,85 @@ public void 
testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
 
         bootstrap.stop();
 
-        // we call the error handler
-        assertException(errorHandlerFuture, CancellationException.class);
+        // we didn't call the error handler
+        assertFalse(errorHandlerFuture.isDone());
 
-        // we return a future that is completed exceptionally
-        assertException(shutdownFuture, CancellationException.class);
+        // shutdown future gets completed normally
+        shutdownFuture.get();
+
+        // verify that we didn't shut down the cluster
+        assertFalse(shutdownCalled.get());
 
         // verify that the application task is being cancelled
         assertThat(applicationExecutionFuture.isCancelled(), is(true));
+        assertThat(applicationExecutionFuture.isDone(), is(true));
     }
 
     @Test
-    public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws 
Exception {
+    public void testClusterIsShutdownCalledWhenSubmissionThrowsAnException() 
throws Exception {

Review comment:
       Is the test name correct wrt to what it tests? It looks as if we assert 
that `shutdown` is not called and that the `errorHandlerFuture` is completed.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -147,35 +147,41 @@ public void stop() {
      */
     private CompletableFuture<Acknowledge> 
runApplicationAndShutdownClusterAsync(
             final DispatcherGateway dispatcherGateway) {
-        return applicationCompletionFuture
-                .handle(
-                        (ignored, t) -> {
-                            if (t == null) {
-                                LOG.info("Application completed SUCCESSFULLY");
-                                return dispatcherGateway.shutDownCluster(
-                                        ApplicationStatus.SUCCEEDED);
-                            }
-
-                            final Optional<UnsuccessfulExecutionException> 
maybeException =
-                                    ExceptionUtils.findThrowable(
-                                            t, 
UnsuccessfulExecutionException.class);
-                            if (maybeException.isPresent()) {
-                                final ApplicationStatus applicationStatus =
-                                        maybeException.get().getStatus();
-                                if (applicationStatus == 
ApplicationStatus.CANCELED
-                                        || applicationStatus == 
ApplicationStatus.FAILED) {
-                                    LOG.info("Application {}: ", 
applicationStatus, t);
-                                    return 
dispatcherGateway.shutDownCluster(applicationStatus);
-                                }
-                            }
-
-                            LOG.warn("Application failed unexpectedly: ", t);
-                            this.errorHandler.onFatalError(
-                                    new FlinkException("Application failed 
unexpectedly.", t));
-
-                            return 
FutureUtils.<Acknowledge>completedExceptionally(t);
-                        })
-                .thenCompose(Function.identity());
+        final CompletableFuture<Acknowledge> shutdownFuture =
+                applicationCompletionFuture
+                        .handle(
+                                (ignored, t) -> {
+                                    if (t == null) {
+                                        LOG.info("Application completed 
SUCCESSFULLY");
+                                        return 
dispatcherGateway.shutDownCluster(
+                                                ApplicationStatus.SUCCEEDED);
+                                    }
+                                    final 
Optional<UnsuccessfulExecutionException> maybeException =
+                                            ExceptionUtils.findThrowable(
+                                                    t, 
UnsuccessfulExecutionException.class);
+                                    if (maybeException.isPresent()) {
+                                        final ApplicationStatus 
applicationStatus =
+                                                
maybeException.get().getStatus();
+                                        if (applicationStatus == 
ApplicationStatus.CANCELED
+                                                || applicationStatus == 
ApplicationStatus.FAILED) {
+                                            LOG.info("Application {}: ", 
applicationStatus, t);
+                                            return 
dispatcherGateway.shutDownCluster(
+                                                    applicationStatus);
+                                        }
+                                    }
+
+                                    if (t instanceof CancellationException) {
+                                        LOG.warn(
+                                                "Application is cancelled 
because the executing dispatcher lost leadership.");

Review comment:
       I think this is not clear at this point. Maybe it is better to say that 
the `ApplicationDispatcherBootstrap` is being stopped.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to