Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5944#discussion_r185164034
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
---
@@ -109,7 +119,11 @@ public MiniDispatcher(
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the
first JobResult successfully
- jobResultFuture.whenComplete((JobResult ignored,
Throwable throwable) -> shutDown());
+ jobResultFuture.whenComplete((JobResult result,
Throwable throwable) -> {
+ ApplicationStatus status =
result.getSerializedThrowable().isPresent() ?
+ ApplicationStatus.FAILED :
ApplicationStatus.SUCCEEDED;
+ jobTerminationFuture.complete(status);
--- End diff --
I think the functional way would be:
```
jobTerminationFuture.complete(result.getSerializedThrowable()
.map(serializedThrowable ->
ApplicationStatus.FAILED)
.orElse(ApplicationStatus.SUCCEEDED));
```
---