This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 82e603a7d3e24d89c09f6ccdc42e4dcc07232040 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Jul 6 16:53:32 2020 +0200 [FLINK-18519][REST] Send exception to client when app fails to execute This closes #12845. --- flink-runtime-web/pom.xml | 21 ++++++++ .../runtime/webmonitor/handlers/JarRunHandler.java | 13 +++-- .../handlers/JarHandlerParameterTest.java | 2 +- .../handlers/JarRunHandlerParameterTest.java | 56 ++++++++++++++++++++++ .../handlers/utils/EagerSinkProgram.java | 31 ++++++++++++ 5 files changed, 118 insertions(+), 5 deletions(-) diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index bdeead9..a625e6d 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -37,6 +37,7 @@ under the License. <properties> <test.parameterProgram.name>parameter-program</test.parameterProgram.name> <test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name> + <test.ParameterProgramWithEagerSink.name>parameter-program-with-eager-sink</test.ParameterProgramWithEagerSink.name> </properties> <dependencies> @@ -196,6 +197,25 @@ under the License. </execution> <execution> <!-- Used for JarHandler tests --> + <id>test-parameter-program-jar-with-eager-sink</id> + <phase>process-test-classes</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <includes> + <include>org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java</include> + </includes> + <archive> + <manifest> + <mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.EagerSinkProgram</mainClass> + </manifest> + </archive> + <finalName>${test.ParameterProgramWithEagerSink.name}</finalName> + </configuration> + </execution> + <execution> + <!-- Used for JarHandler tests --> <id>test-parameter-program-jar-without-manifest</id> <phase>process-test-classes</phase> <goals> @@ -279,6 +299,7 @@ under the License. <targetDir>${project.build.directory}</targetDir> <parameterJarName>${test.parameterProgram.name}</parameterJarName> <parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName> + <parameterJarWithEagerSinkName>${test.ParameterProgramWithEagerSink.name}</parameterJarWithEagerSinkName> </systemPropertyVariables> </configuration> </plugin> diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 84a74bd..223dbae 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.client.deployment.application.ApplicationRunner; import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.runtime.dispatcher.DispatcherGateway; @@ -34,6 +33,8 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + import javax.annotation.Nonnull; import java.nio.file.Path; @@ -97,9 +98,13 @@ public class JarRunHandler extends return CompletableFuture .supplyAsync(() -> applicationRunner.run(gateway, program, effectiveConfiguration), executor) - .thenApply(jobIds -> { - if (jobIds.isEmpty()) { - throw new CompletionException(new ProgramInvocationException("No jobs submitted.")); + .handle((jobIds, throwable) -> { + if (throwable != null) { + throw new CompletionException( + new RestHandlerException("Could not execute application.", HttpResponseStatus.BAD_REQUEST, throwable)); + } else if (jobIds.isEmpty()) { + throw new CompletionException( + new RestHandlerException("No jobs included in application.", HttpResponseStatus.BAD_REQUEST)); } return new JarRunResponseBody(jobIds.get(0)); }); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java index 354be7a..4a04d51 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java @@ -259,7 +259,7 @@ public abstract class JarHandlerParameterTest ? Arrays.asList(PROG_ARGS) : null; } - private static <REQB extends JarRequestBody, M extends JarMessageParameters> + protected static <REQB extends JarRequestBody, M extends JarMessageParameters> HandlerRequest<REQB, M> createRequest( REQB requestBody, M parameters, M unresolvedMessageParameters, Path jar) throws HandlerRequestException { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java index 13ba43c..f577090 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java @@ -23,30 +23,46 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.client.deployment.application.DetachedApplicationRunner; import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** * Tests for the parameter handling of the {@link JarRunHandler}. @@ -57,6 +73,8 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe private static JarRunHandler handler; + private static Path jarWithEagerSink; + @BeforeClass public static void setup() throws Exception { init(); @@ -65,6 +83,12 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe final Map<String, String> responseHeaders = Collections.emptyMap(); final Executor executor = TestingUtils.defaultExecutor(); + final Path jarLocation = Paths.get(System.getProperty("targetDir")); + final String parameterProgramWithEagerSink = "parameter-program-with-eager-sink.jar"; + jarWithEagerSink = Files.copy( + jarLocation.resolve(parameterProgramWithEagerSink), + jarDir.resolve("program-with-eager-sink.jar")); + handler = new JarRunHandler( gatewayRetriever, timeout, @@ -156,6 +180,38 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe return new JarRunRequestBody(null, null, null, null, jobId, null, null); } + @Test + public void testRestHandlerExceptionThrownWithEagerSinks() throws Exception { + final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request = createRequest( + getDefaultJarRequestBody(), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithEagerSink + ); + + try { + handler.handleRequest(request, restfulGateway).get(); + } catch (final ExecutionException e) { + final Throwable throwable = ExceptionUtils.stripCompletionException(e.getCause()); + assertThat(throwable, instanceOf(RestHandlerException.class)); + + final RestHandlerException restHandlerException = (RestHandlerException) throwable; + assertThat(restHandlerException.getHttpResponseStatus(), equalTo(HttpResponseStatus.BAD_REQUEST)); + + final Optional<ProgramInvocationException> invocationException = + ExceptionUtils.findThrowable(restHandlerException, ProgramInvocationException.class); + + if (!invocationException.isPresent()) { + fail(); + } + + final String exceptionMsg = invocationException.get().getMessage(); + assertThat(exceptionMsg, containsString("Job was submitted in detached mode.")); + return; + } + fail("The test should have failed."); + } + @Override void handleRequest(HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request) throws Exception { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java new file mode 100644 index 0000000..145a6a0 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.utils; + +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * Javadoc. + */ +public class EagerSinkProgram { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements("hello", "world").print(); + } +}