Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6180#discussion_r197453235 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -132,74 +132,80 @@ class ProcTimeBoundedRangeOver( val currentTime = timestamp - 1 var i = 0 + // get the list of elements of current proctime + val currentElements = rowMapState.get(currentTime) - // initialize the accumulators - var accumulators = accumulatorState.value() + // clean-up timers might expire, which pass the needToCleanupState check above. + // null-check is necessary here, otherwise NPE might be thrown. + if(null != currentElements) { --- End diff -- How about changing this to ``` if (null == currentElements) { return } ``` It would be touch much fewer lines of code and IMO easier to read (less nesting). Please note the space between `if` and the condition.
---