[ 
https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633901#comment-16633901
 ] 

ASF GitHub Bot commented on FLINK-10312:
----------------------------------------

zentol closed pull request #6731: [FLINK-10312] Propagate exception from server 
to client in REST API
URL: https://github.com/apache/flink/pull/6731
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 935a07faf89..9eb8cc72c8f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -376,7 +376,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
                                (JobSubmitResponseBody jobSubmitResponseBody) 
-> new JobSubmissionResult(jobGraph.getJobID()))
                        .exceptionally(
                                (Throwable throwable) -> {
-                                       throw new CompletionException(new 
JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", 
throwable));
+                                       throw new CompletionException(new 
JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", 
ExceptionUtils.stripCompletionException(throwable)));
                                });
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index d4a65dec8f6..5a19a3f628d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -198,7 +198,10 @@
                                                if (throwable instanceof 
CancellationException) {
                                                        
resultFuture.completeExceptionally(new RetryException("Operation future was 
cancelled.", throwable));
                                                } else {
-                                                       if (retries > 0 && 
retryPredicate.test(throwable)) {
+                                                       throwable = 
ExceptionUtils.stripExecutionException(throwable);
+                                                       if 
(!retryPredicate.test(throwable)) {
+                                                               
resultFuture.completeExceptionally(throwable);
+                                                       } else if (retries > 0) 
{
                                                                final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
                                                                        () -> 
retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, 
retryPredicate, scheduledExecutor),
                                                                        
retryDelay.toMilliseconds(),
@@ -207,12 +210,10 @@
                                                                
resultFuture.whenComplete(
                                                                        
(innerT, innerThrowable) -> scheduledFuture.cancel(false));
                                                        } else {
-                                                               final String 
errorMsg = retries == 0 ?
-                                                                       "Number 
of retries has been exhausted." :
-                                                                       
"Exception is not retryable.";
-                                                               
resultFuture.completeExceptionally(new RetryException(
-                                                                       "Could 
not complete the operation. " + errorMsg,
-                                                                       
throwable));
+                                                               RetryException 
retryException = new RetryException(
+                                                                       "Could 
not complete the operation. Number of retries has been exhausted.",
+                                                                       
throwable);
+                                                               
resultFuture.completeExceptionally(retryException);
                                                        }
                                                }
                                        } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index e4cec086017..9cfb58e98a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
@@ -39,6 +40,7 @@
 
 import javax.annotation.Nonnull;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -80,43 +82,26 @@ protected AbstractRestHandler(
                }
 
                return response.whenComplete((P resp, Throwable throwable) -> {
-                       if (throwable != null) {
-
-                               Throwable error = 
ExceptionUtils.stripCompletionException(throwable);
-
-                               if (error instanceof RestHandlerException) {
-                                       final RestHandlerException rhe = 
(RestHandlerException) error;
-
-                                       processRestHandlerException(ctx, 
httpRequest, rhe);
-                               } else {
-                                       log.error("Implementation error: 
Unhandled exception.", error);
-                                       HandlerUtils.sendErrorResponse(
-                                               ctx,
-                                               httpRequest,
-                                               new ErrorResponseBody("Internal 
server error."),
-                                               
HttpResponseStatus.INTERNAL_SERVER_ERROR,
-                                               responseHeaders);
-                               }
-                       } else {
-                               HandlerUtils.sendResponse(
-                                       ctx,
-                                       httpRequest,
-                                       resp,
-                                       messageHeaders.getResponseStatusCode(),
-                                       responseHeaders);
-                       }
+                       Tuple2<ResponseBody, HttpResponseStatus> r = throwable 
!= null ?
+                               errorResponse(throwable) : Tuple2.of(resp, 
messageHeaders.getResponseStatusCode());
+                       HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, 
responseHeaders);
                }).thenApply(ignored -> null);
        }
 
-       private void processRestHandlerException(ChannelHandlerContext ctx, 
HttpRequest httpRequest, RestHandlerException rhe) {
-               log.error("Exception occurred in REST handler.", rhe);
-
-               HandlerUtils.sendErrorResponse(
-                       ctx,
-                       httpRequest,
-                       new ErrorResponseBody(rhe.getMessage()),
-                       rhe.getHttpResponseStatus(),
-                       responseHeaders);
+       private Tuple2<ResponseBody, HttpResponseStatus> 
errorResponse(Throwable throwable) {
+               Throwable error = 
ExceptionUtils.stripCompletionException(throwable);
+               if (error instanceof RestHandlerException) {
+                       final RestHandlerException rhe = (RestHandlerException) 
error;
+                       log.error("Exception occurred in REST handler.", rhe);
+                       return Tuple2.of(new 
ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
+               } else {
+                       log.error("Implementation error: Unhandled exception.", 
error);
+                       String stackTrace = String.format("<Exception on server 
side:%n%s%nEnd of exception on server side>",
+                               ExceptionUtils.stringifyException(throwable));
+                       return Tuple2.of(
+                               new ErrorResponseBody(Arrays.asList("Internal 
server error.", stackTrace)),
+                               HttpResponseStatus.INTERNAL_SERVER_ERROR);
+               }
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index 491ba094f4c..e3101bef5ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -114,10 +114,7 @@ public JobSubmitHandler(
                CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
 
                return jobSubmissionFuture.thenCombine(jobGraphFuture,
-                       (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
-                       .exceptionally(exception -> {
-                               throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
-                       });
+                       (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
        }
 
        private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody 
requestBody, Map<String, Path> nameToFile) throws MissingFileException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index c386952c056..99962a6848e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -82,7 +82,7 @@ public void testRetrySuccess() throws Exception {
                        TestingUtils.defaultExecutor());
 
                assertTrue(retryFuture.get());
-               assertTrue(retries == atomicInteger.get());
+               assertEquals(retries, atomicInteger.get());
        }
 
        /**
@@ -274,7 +274,7 @@ public void testRetryWithDelayAndPredicate() throws 
Exception {
                                        throwable instanceof RuntimeException 
&& throwable.getMessage().contains(retryableExceptionMessage),
                                new 
ScheduledExecutorServiceAdapter(retryExecutor)).get();
                } catch (final ExecutionException e) {
-                       assertThat(e.getMessage(), containsString("Could not 
complete the operation"));
+                       assertThat(e.getMessage(), containsString("should 
propagate"));
                } finally {
                        retryExecutor.shutdownNow();
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index be1cb797748..78ace967bec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -269,11 +269,7 @@ public void testFailedJobSubmission() throws Exception {
                                .get();
                } catch (Exception e) {
                        Throwable t = ExceptionUtils.stripExecutionException(e);
-                       if (t instanceof RestHandlerException){
-                               Assert.assertTrue(t.getMessage().equals("Job 
submission failed."));
-                       } else {
-                               throw e;
-                       }
+                       Assert.assertEquals(errorMessage, t.getMessage());
                }
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Wrong / missing exception when submitting job
> ---------------------------------------------
>
>                 Key: FLINK-10312
>                 URL: https://issues.apache.org/jira/browse/FLINK-10312
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.5.2, 1.6.0
>            Reporter: Stephan Ewen
>            Assignee: Andrey Zagrebin
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2, 1.5.5
>
>         Attachments: lmerge-TR.pdf
>
>
> h3. Problem
> When submitting a job that cannot be created / initialized on the JobManager, 
> there is no proper error message. The exception says *"Could not retrieve the 
> execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)"*
> h3. Steps to Reproduce
> Create a streaming job, set a state backend with a non existing file system 
> scheme.
> h3. Full Stack Trace
> {code}
> Submitting a job where instantiation on the JM fails yields this, which seems 
> like a major regression from seeing the actual exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
>       at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>       at 
> com.dataartisans.streamledger.examples.simpletrade.SimpleTradeExample.main(SimpleTradeExample.java:98)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
>       at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$25(RestClusterClient.java:379)
>       at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$32(FutureUtils.java:213)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Exception is not retryable.
>       at 
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>       at 
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>       at 
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>       at 
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>       ... 12 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Exception is not retryable.
>       ... 10 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Job submission 
> failed.]
>       at 
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>       at 
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>       at 
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>       ... 4 more
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job 
> submission failed.]
>       at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:310)
>       at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$364(RestClient.java:294)
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>       ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to