[ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-10674:
---------------------------------
    Description: 
Our online Flink Job run about a week,job contain sql :
{code:java}
select  `time`,  
        lower(trim(os_type)) as os_type, 
        count(distinct feed_id) as feed_total_view  
from  my_table 
group by `time`, lower(trim(os_type)){code}
 

  then occur NPE: 

 
{code:java}
java.lang.NullPointerException

at scala.Predef$.Long2long(Predef.scala:363)

at 
org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)

at NonWindowedAggregationHelper$894.retract(Unknown Source)

at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)

at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)

at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)

at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)
{code}
 

 

View DistinctAccumulator.remove
 !image-2018-10-25-14-46-03-373.png!

 

this NPE should currentCnt = null lead to, so we simple handle like :
{code:java}
def remove(params: Row): Boolean = {
  if(!distinctValueMap.contains(params)){
    true
  }else{
    val currentCnt = distinctValueMap.get(params)
    // 
    if (currentCnt == null || currentCnt == 1) {
      distinctValueMap.remove(params)
      true
    } else {
      var value = currentCnt - 1L
      if(value < 0){
        value = 1
      }
      distinctValueMap.put(params, value)
      false
    }
  }
}{code}
 

Update:

Because state clean up happens in processing time, it might be
 the case that retractions are arriving after the state has
 been cleaned up. Before these changes, a new accumulator was
 created and invalid retraction messages were emitted. This
 change drops retraction messages for which no accumulator
 exists. 

  was:
Our online Flink Job run about a week,job contain sql :
{code:java}
select  `time`,  
        lower(trim(os_type)) as os_type, 
        count(distinct feed_id) as feed_total_view  
from  my_table 
group by `time`, lower(trim(os_type)){code}
 

  then occur NPE: 

 
{code:java}
java.lang.NullPointerException

at scala.Predef$.Long2long(Predef.scala:363)

at 
org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)

at NonWindowedAggregationHelper$894.retract(Unknown Source)

at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)

at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)

at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)

at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)
{code}
 

 

View DistinctAccumulator.remove
!image-2018-10-25-14-46-03-373.png!


 

this NPE should currentCnt = null lead to, so we simple handle like :
{code:java}
def remove(params: Row): Boolean = {
  if(!distinctValueMap.contains(params)){
    true
  }else{
    val currentCnt = distinctValueMap.get(params)
    // 
    if (currentCnt == null || currentCnt == 1) {
      distinctValueMap.remove(params)
      true
    } else {
      var value = currentCnt - 1L
      if(value < 0){
        value = 1
      }
      distinctValueMap.put(params, value)
      false
    }
  }
}{code}
 

 


> Fix handling of retractions after clean up
> ------------------------------------------
>
>                 Key: FLINK-10674
>                 URL: https://issues.apache.org/jira/browse/FLINK-10674
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.6.1
>         Environment: Flink 1.6.0
>            Reporter: ambition
>            Assignee: Timo Walther
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
>         lower(trim(os_type)) as os_type, 
>         count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
>  !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
>     true
>   }else{
>     val currentCnt = distinctValueMap.get(params)
>     // 
>     if (currentCnt == null || currentCnt == 1) {
>       distinctValueMap.remove(params)
>       true
>     } else {
>       var value = currentCnt - 1L
>       if(value < 0){
>         value = 1
>       }
>       distinctValueMap.put(params, value)
>       false
>     }
>   }
> }{code}
>  
> Update:
> Because state clean up happens in processing time, it might be
>  the case that retractions are arriving after the state has
>  been cleaned up. Before these changes, a new accumulator was
>  created and invalid retraction messages were emitted. This
>  change drops retraction messages for which no accumulator
>  exists. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to