Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5256#discussion_r160200816
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java ---
@@ -63,10 +65,16 @@ public static void bestEffortDiscardAllStateObjects(
public static void discardStateFuture(RunnableFuture<? extends
StateObject> stateFuture) throws Exception {
if (null != stateFuture) {
if (!stateFuture.cancel(true)) {
- StateObject stateObject =
FutureUtil.runIfNotDoneAndGet(stateFuture);
- if (null != stateObject) {
- stateObject.discardState();
+ try {
+ // We attempt to get a result, in case
the future completed before cancellation.
+ StateObject stateObject =
FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+ if (null != stateObject) {
+ stateObject.discardState();
+ }
+ } catch (CancellationException |
ExecutionException ignore) {
+ // No result that requires discarding
was produced.
--- End diff --
Maybe we should log on DEBUG that the state future could not be completed.
---