This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82e603a7d3e24d89c09f6ccdc42e4dcc07232040
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Jul 6 16:53:32 2020 +0200

    [FLINK-18519][REST] Send exception to client when app fails to execute
    
    This closes #12845.
---
 flink-runtime-web/pom.xml                          | 21 ++++++++
 .../runtime/webmonitor/handlers/JarRunHandler.java | 13 +++--
 .../handlers/JarHandlerParameterTest.java          |  2 +-
 .../handlers/JarRunHandlerParameterTest.java       | 56 ++++++++++++++++++++++
 .../handlers/utils/EagerSinkProgram.java           | 31 ++++++++++++
 5 files changed, 118 insertions(+), 5 deletions(-)

diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index bdeead9..a625e6d 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -37,6 +37,7 @@ under the License.
        <properties>
                
<test.parameterProgram.name>parameter-program</test.parameterProgram.name>
                
<test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
+               
<test.ParameterProgramWithEagerSink.name>parameter-program-with-eager-sink</test.ParameterProgramWithEagerSink.name>
        </properties>
 
        <dependencies>
@@ -196,6 +197,25 @@ under the License.
                                        </execution>
                                        <execution>
                                                <!-- Used for JarHandler tests 
-->
+                                               
<id>test-parameter-program-jar-with-eager-sink</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <includes>
+                                                               
<include>org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java</include>
+                                                       </includes>
+                                                       <archive>
+                                                               <manifest>
+                                                                       
<mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.EagerSinkProgram</mainClass>
+                                                               </manifest>
+                                                       </archive>
+                                                       
<finalName>${test.ParameterProgramWithEagerSink.name}</finalName>
+                                               </configuration>
+                                       </execution>
+                                       <execution>
+                                               <!-- Used for JarHandler tests 
-->
                                                
<id>test-parameter-program-jar-without-manifest</id>
                                                
<phase>process-test-classes</phase>
                                                <goals>
@@ -279,6 +299,7 @@ under the License.
                                                
<targetDir>${project.build.directory}</targetDir>
                                                
<parameterJarName>${test.parameterProgram.name}</parameterJarName>
                                                
<parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
+                                               
<parameterJarWithEagerSinkName>${test.ParameterProgramWithEagerSink.name}</parameterJarWithEagerSinkName>
                                        </systemPropertyVariables>
                                </configuration>
                        </plugin>
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 84a74bd..223dbae 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.deployment.application.ApplicationRunner;
 import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -34,6 +33,8 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
 import javax.annotation.Nonnull;
 
 import java.nio.file.Path;
@@ -97,9 +98,13 @@ public class JarRunHandler extends
 
                return CompletableFuture
                                .supplyAsync(() -> 
applicationRunner.run(gateway, program, effectiveConfiguration), executor)
-                               .thenApply(jobIds -> {
-                                       if (jobIds.isEmpty()) {
-                                               throw new 
CompletionException(new ProgramInvocationException("No jobs submitted."));
+                               .handle((jobIds, throwable) -> {
+                                       if (throwable != null) {
+                                               throw new CompletionException(
+                                                               new 
RestHandlerException("Could not execute application.", 
HttpResponseStatus.BAD_REQUEST, throwable));
+                                       } else if (jobIds.isEmpty()) {
+                                               throw new CompletionException(
+                                                               new 
RestHandlerException("No jobs included in application.", 
HttpResponseStatus.BAD_REQUEST));
                                        }
                                        return new 
JarRunResponseBody(jobIds.get(0));
                                });
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
index 354be7a..4a04d51 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
@@ -259,7 +259,7 @@ public abstract class JarHandlerParameterTest
                        ? Arrays.asList(PROG_ARGS) : null;
        }
 
-       private static <REQB extends JarRequestBody, M extends 
JarMessageParameters>
+       protected static <REQB extends JarRequestBody, M extends 
JarMessageParameters>
        HandlerRequest<REQB, M> createRequest(
                REQB requestBody, M parameters, M unresolvedMessageParameters, 
Path jar)
                throws HandlerRequestException {
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 13ba43c..f577090 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -23,30 +23,46 @@ import org.apache.flink.api.common.time.Time;
 import 
org.apache.flink.client.deployment.application.DetachedApplicationRunner;
 import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the parameter handling of the {@link JarRunHandler}.
@@ -57,6 +73,8 @@ public class JarRunHandlerParameterTest extends 
JarHandlerParameterTest<JarRunRe
 
        private static JarRunHandler handler;
 
+       private static Path jarWithEagerSink;
+
        @BeforeClass
        public static void setup() throws Exception {
                init();
@@ -65,6 +83,12 @@ public class JarRunHandlerParameterTest extends 
JarHandlerParameterTest<JarRunRe
                final Map<String, String> responseHeaders = 
Collections.emptyMap();
                final Executor executor = TestingUtils.defaultExecutor();
 
+               final Path jarLocation = 
Paths.get(System.getProperty("targetDir"));
+               final String parameterProgramWithEagerSink = 
"parameter-program-with-eager-sink.jar";
+               jarWithEagerSink = Files.copy(
+                               
jarLocation.resolve(parameterProgramWithEagerSink),
+                               jarDir.resolve("program-with-eager-sink.jar"));
+
                handler = new JarRunHandler(
                        gatewayRetriever,
                        timeout,
@@ -156,6 +180,38 @@ public class JarRunHandlerParameterTest extends 
JarHandlerParameterTest<JarRunRe
                return new JarRunRequestBody(null, null, null, null, jobId, 
null, null);
        }
 
+       @Test
+       public void testRestHandlerExceptionThrownWithEagerSinks() throws 
Exception {
+               final HandlerRequest<JarRunRequestBody, 
JarRunMessageParameters> request = createRequest(
+                               getDefaultJarRequestBody(),
+                               getUnresolvedJarMessageParameters(),
+                               getUnresolvedJarMessageParameters(),
+                               jarWithEagerSink
+               );
+
+               try {
+                       handler.handleRequest(request, restfulGateway).get();
+               } catch (final ExecutionException e) {
+                       final Throwable throwable =      
ExceptionUtils.stripCompletionException(e.getCause());
+                       assertThat(throwable, 
instanceOf(RestHandlerException.class));
+
+                       final RestHandlerException restHandlerException = 
(RestHandlerException) throwable;
+                       
assertThat(restHandlerException.getHttpResponseStatus(), 
equalTo(HttpResponseStatus.BAD_REQUEST));
+
+                       final Optional<ProgramInvocationException> 
invocationException =
+                                       
ExceptionUtils.findThrowable(restHandlerException, 
ProgramInvocationException.class);
+
+                       if (!invocationException.isPresent()) {
+                               fail();
+                       }
+
+                       final String exceptionMsg = 
invocationException.get().getMessage();
+                       assertThat(exceptionMsg, containsString("Job was 
submitted in detached mode."));
+                       return;
+               }
+               fail("The test should have failed.");
+       }
+
        @Override
        void handleRequest(HandlerRequest<JarRunRequestBody, 
JarRunMessageParameters> request)
                throws Exception {
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
new file mode 100644
index 0000000..145a6a0
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Javadoc.
+ */
+public class EagerSinkProgram {
+       public static void main(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements("hello", "world").print();
+       }
+}

Reply via email to