[ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681497#comment-16681497
 ] 

ASF GitHub Bot commented on FLINK-10674:
----------------------------------------

wenhuitang opened a new pull request #7076: [Patch]:Fix 
[FLINK-10674]DistinctAccumulator.remove lead to NPE.
URL: https://github.com/apache/flink/pull/7076
 
 
   
   ## What is the purpose of the change
   This pull request fixes the problem reported by FLINK-10674 
DistinctAccumulator.remove lead to NPE.
   
   
   ## Brief change log
   DistinctAccumulator.remove deals with the situation that there is no 
corresponding instance of the parameters in the distinct map.
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no) no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
     - The serializers: (yes / no / don't know) no
     - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) no
     - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no) 
       no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DistinctAccumulator.remove lead to NPE
> --------------------------------------
>
>                 Key: FLINK-10674
>                 URL: https://issues.apache.org/jira/browse/FLINK-10674
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.6.1
>         Environment: Flink 1.6.0
>            Reporter: ambition
>            Assignee: winifredtang
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
>         lower(trim(os_type)) as os_type, 
>         count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
> !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
>     true
>   }else{
>     val currentCnt = distinctValueMap.get(params)
>     // 
>     if (currentCnt == null || currentCnt == 1) {
>       distinctValueMap.remove(params)
>       true
>     } else {
>       var value = currentCnt - 1L
>       if(value < 0){
>         value = 1
>       }
>       distinctValueMap.put(params, value)
>       false
>     }
>   }
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to