[ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692994#comment-16692994
 ] 

ASF GitHub Bot commented on FLINK-10674:
----------------------------------------

twalthr opened a new pull request #7147: [FLINK-10674] [table] Fix handling of 
retractions after clean up
URL: https://github.com/apache/flink/pull/7147
 
 
   ## What is the purpose of the change
   
   Because state clean up happens in processing time, it might be the case that 
retractions are arriving after the state has been cleaned up. Before these 
changes, a new accumulator was created and invalid retraction messages were 
emitted. This change drops retraction messages for which no accumulator exists. 
It also refactors the tests and adds more test cases and explanations.
   
   ## Brief change log
   
   - Fix of `org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction`
   - Improvements to the harness tests
   
   
   ## Verifying this change
   
   Additional harness tests and one ITCase for nested distinct aggregates.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DistinctAccumulator.remove lead to NPE
> --------------------------------------
>
>                 Key: FLINK-10674
>                 URL: https://issues.apache.org/jira/browse/FLINK-10674
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.6.1
>         Environment: Flink 1.6.0
>            Reporter: ambition
>            Assignee: winifredtang
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
>         lower(trim(os_type)) as os_type, 
>         count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
> !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
>     true
>   }else{
>     val currentCnt = distinctValueMap.get(params)
>     // 
>     if (currentCnt == null || currentCnt == 1) {
>       distinctValueMap.remove(params)
>       true
>     } else {
>       var value = currentCnt - 1L
>       if(value < 0){
>         value = 1
>       }
>       distinctValueMap.put(params, value)
>       false
>     }
>   }
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to