Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5885#discussion_r184657419
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
---
@@ -103,17 +104,17 @@ public void add(T value) throws IOException {
private static final class FoldTransformation<T, ACC> implements
StateTransformationFunction<ACC, T> {
- private final FoldingStateDescriptor<T, ACC> stateDescriptor;
+ private final HeapFoldingState<?, ?, T, ACC> stateRef;
private final FoldFunction<T, ACC> foldFunction;
- FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
- this.stateDescriptor =
Preconditions.checkNotNull(stateDesc);
- this.foldFunction =
Preconditions.checkNotNull(stateDesc.getFoldFunction());
+ FoldTransformation(FoldFunction<T, ACC> foldFunction,
HeapFoldingState<?, ?, T, ACC> stateRef) {
+ this.stateRef = Preconditions.checkNotNull(stateRef);
--- End diff --
Done ð
---