Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5885#discussion_r184093390
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
---
@@ -103,17 +104,17 @@ public void add(T value) throws IOException {
private static final class FoldTransformation<T, ACC> implements
StateTransformationFunction<ACC, T> {
- private final FoldingStateDescriptor<T, ACC> stateDescriptor;
+ private final HeapFoldingState<?, ?, T, ACC> stateRef;
private final FoldFunction<T, ACC> foldFunction;
- FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
- this.stateDescriptor =
Preconditions.checkNotNull(stateDesc);
- this.foldFunction =
Preconditions.checkNotNull(stateDesc.getFoldFunction());
+ FoldTransformation(FoldFunction<T, ACC> foldFunction,
HeapFoldingState<?, ?, T, ACC> stateRef) {
+ this.stateRef = Preconditions.checkNotNull(stateRef);
--- End diff --
Maybe the more honest and simple way is making this a non-static inner
class instead of passing a reference to `HeapFoldingState.this` in the
constructor. Essentially it does the same.
---