Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r165275719
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
---
@@ -109,14 +109,20 @@ public OperatorStateCheckpointOutputStream
getRawOperatorStateOutput() throws Ex
return operatorStateCheckpointOutputStream;
}
- public RunnableFuture<KeyedStateHandle> getKeyedStateStreamFuture()
throws IOException {
- KeyGroupsStateHandle keyGroupsStateHandle =
closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
- return new DoneFuture<KeyedStateHandle>(keyGroupsStateHandle);
+ public RunnableFuture<SnapshotResult<KeyedStateHandle>>
getKeyedStateStreamFuture() throws IOException {
+ return
getGenericStateStreamFuture(keyedStateCheckpointOutputStream);
}
- public RunnableFuture<OperatorStateHandle>
getOperatorStateStreamFuture() throws IOException {
- OperatorStateHandle operatorStateHandle =
closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
- return new DoneFuture<>(operatorStateHandle);
+ public RunnableFuture<SnapshotResult<OperatorStateHandle>>
getOperatorStateStreamFuture() throws IOException {
+ return
getGenericStateStreamFuture(operatorStateCheckpointOutputStream);
+ }
+
+ private <T extends StateObject> RunnableFuture<SnapshotResult<T>>
getGenericStateStreamFuture(
+ NonClosingCheckpointOutputStream<? extends T> stream) throws
IOException {
+ T operatorStateHandle = (T)
closeAndUnregisterStreamToObtainStateHandle(stream);
--- End diff --
This cast seems a bit fishy to me. I think it should not be necessary if
the generics are applied correctly. A way to solve it would be `T extends
StreamStateHandle` and `RunnableFuture<? extends SnapshotResult<? extends
KeyedStateHandle>> getKeyedStateStreamFuture()`
---