zentol commented on a change in pull request #11469:
URL: https://github.com/apache/flink/pull/11469#discussion_r473362720
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
##########
@@ -458,6 +466,123 @@ public void
testRetriableSendOperationIfConnectionErrorOrServiceUnavailable() th
}
}
+ private class TestJobExecutionResultHandler extends
TestHandler<EmptyRequestBody, JobExecutionResultResponseBody,
JobMessageParameters> {
+
+ private final Iterator<Object> jobExecutionResults;
+
+ private Object lastJobExecutionResult;
+
+ private TestJobExecutionResultHandler(
+ final Object... jobExecutionResults) {
+ super(JobExecutionResultHeaders.getInstance());
+ checkArgument(Arrays.stream(jobExecutionResults)
+ .allMatch(object -> object instanceof
JobExecutionResultResponseBody
+ || object instanceof
RestHandlerException));
+ this.jobExecutionResults =
Arrays.asList(jobExecutionResults).iterator();
+ }
+
+ @Override
+ protected CompletableFuture<JobExecutionResultResponseBody>
handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody,
JobMessageParameters> request,
+ @Nonnull DispatcherGateway gateway) {
+ if (jobExecutionResults.hasNext()) {
+ lastJobExecutionResult =
jobExecutionResults.next();
+ }
+ checkState(lastJobExecutionResult != null);
+ if (lastJobExecutionResult instanceof
JobExecutionResultResponseBody) {
+ return
CompletableFuture.completedFuture((JobExecutionResultResponseBody)
lastJobExecutionResult);
+ } else if (lastJobExecutionResult instanceof
RestHandlerException) {
+ return
FutureUtils.completedExceptionally((RestHandlerException)
lastJobExecutionResult);
+ } else {
+ throw new AssertionError();
+ }
+ }
+ }
+
+ @Test
+ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
+ final TestJobExecutionResultHandler
testJobExecutionResultHandler =
+ new TestJobExecutionResultHandler(
+ new RestHandlerException("should trigger
retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
+ JobExecutionResultResponseBody.inProgress(),
+ JobExecutionResultResponseBody.created(new
JobResult.Builder()
+
.applicationStatus(ApplicationStatus.SUCCEEDED)
+ .jobId(jobId)
+ .netRuntime(Long.MAX_VALUE)
+
.accumulatorResults(Collections.singletonMap("testName", new
SerializedValue<>(OptionalFailure.of(1.0))))
+ .build()),
+ JobExecutionResultResponseBody.created(new
JobResult.Builder()
+
.applicationStatus(ApplicationStatus.FAILED)
+ .jobId(jobId)
+ .netRuntime(Long.MAX_VALUE)
+ .serializedThrowable(new
SerializedThrowable(new RuntimeException("expected")))
+ .build()));
+
+ // fail first HTTP polling attempt, which should not be a
problem because of the retries
+ final AtomicBoolean firstPollFailed = new AtomicBoolean();
+ failHttpRequest = (messageHeaders, messageParameters,
requestBody) ->
+ messageHeaders instanceof JobExecutionResultHeaders &&
!firstPollFailed.getAndSet(true);
+
+ try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(
+ testJobExecutionResultHandler,
+ new TestJobSubmitHandler())) {
+
+ try (RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+ final JobExecutionResult jobExecutionResult =
restClusterClient.submitJob(jobGraph)
+
.thenCompose(restClusterClient::requestJobResult)
+ .get()
+
.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+ assertThat(jobExecutionResult.getJobID(),
equalTo(jobId));
+ assertThat(jobExecutionResult.getNetRuntime(),
equalTo(Long.MAX_VALUE));
+ assertThat(
+
jobExecutionResult.getAllAccumulatorResults(),
+
equalTo(Collections.singletonMap("testName", 1.0)));
+
+ try {
+ restClusterClient.submitJob(jobGraph)
+
.thenCompose(restClusterClient::requestJobResult)
+ .get()
+
.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+ fail("Expected exception not thrown.");
+ } catch (final Exception e) {
+ final Optional<RuntimeException> cause
= ExceptionUtils.findThrowable(e, RuntimeException.class);
+ assertThat(cause.isPresent(), is(true));
+ assertThat(cause.get().getMessage(),
equalTo("expected"));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testJobSubmissionFailureThrowsProgramInvocationException()
throws Exception {
Review comment:
```suggestion
public void testJobSubmissionFailureCauseForwardedToClient() throws
Exception {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]