[ 
https://issues.apache.org/jira/browse/FLINK-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520404#comment-16520404
 ] 

ASF GitHub Bot commented on FLINK-9524:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6180#discussion_r197453235
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 ---
    @@ -132,74 +132,80 @@ class ProcTimeBoundedRangeOver(
     
         val currentTime = timestamp - 1
         var i = 0
    +    // get the list of elements of current proctime
    +    val currentElements = rowMapState.get(currentTime)
     
    -    // initialize the accumulators
    -    var accumulators = accumulatorState.value()
    +    // clean-up timers might expire, which pass the needToCleanupState 
check above.
    +    // null-check is necessary here, otherwise NPE might be thrown.
    +    if(null != currentElements) {
    --- End diff --
    
    How about changing this to 
    
    ```
    if (null == currentElements) {
      return
    }
    ```
    
    It would be touch much fewer lines of code and IMO easier to read (less 
nesting).
    Please note the space between `if` and the condition.


> NPE from ProcTimeBoundedRangeOver.scala
> ---------------------------------------
>
>                 Key: FLINK-9524
>                 URL: https://issues.apache.org/jira/browse/FLINK-9524
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.5.0
>            Reporter: yan zhou
>            Assignee: yan zhou
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: npe_from_ProcTimeBoundedRangeOver.txt
>
>
> The class _ProcTimeBoundedRangeOver_ would throws NPE if _minRetentionTime_ 
> and _maxRetentionTime_ are set to greater then 1. 
> Please see [^npe_from_ProcTimeBoundedRangeOver.txt] for the detail of  
> exception. Below is a short description of the cause:
>  * When the first event for a key arrives,  the cleanup time is registered 
> with _timerservice_ and recorded in _cleanupTimeState_. If the second event 
> with same key arrives before the cleanup time, the value in 
> _cleanupTimeState_ is updated and a new timer is registered to 
> _timerService_. So now we have two registered timers for cleanup. One is 
> registered because of the first event, the other for the second event.
>  * However, when _onTimer_ method is fired for the first cleanup timer, the 
> _cleanupTimeStates_ value has already been updated to second cleanup time. So 
> it will bypass the _needToCleanupState_ check, and yet run through the 
> remained code of _onTimer_ (which is intended to update the accumulator and 
> emit output) and cause NPE.
> _RowTimeBoundedRangeOver_ has very similar logic with 
> _ProcTimeBoundedRangeOver. But_ It won't cause NPE by the same reason. To 
> avoid the exception, it simply add a null check before running the logic for 
> updating accumulator.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to