Repository: beam Updated Branches: refs/heads/master deee5b3c2 -> 7568f0298
fix FlinkAccumulatorCombiningStateWithContext read null accum bug Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de38410d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de38410d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de38410d Branch: refs/heads/master Commit: de38410d3e2cf9c6edff9438d539929777ad7915 Parents: deee5b3 Author: æ³¢ç¹ <[email protected]> Authored: Thu Apr 20 20:27:31 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri May 26 20:40:59 2017 +0800 ---------------------------------------------------------------------- .../wrappers/streaming/state/FlinkStateInternals.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/de38410d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index 9cb742e..b73abe9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -821,7 +821,11 @@ public class FlinkStateInternals<K> implements StateInternals { flinkStateDescriptor); AccumT accum = state.value(); - return combineFn.extractOutput(accum, context); + if (accum != null) { + return combineFn.extractOutput(accum, context); + } else { + return combineFn.extractOutput(combineFn.createAccumulator(context), context); + } } catch (Exception e) { throw new RuntimeException("Error reading state.", e); }
