[
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692994#comment-16692994
]
ASF GitHub Bot commented on FLINK-10674:
----------------------------------------
twalthr opened a new pull request #7147: [FLINK-10674] [table] Fix handling of
retractions after clean up
URL: https://github.com/apache/flink/pull/7147
## What is the purpose of the change
Because state clean up happens in processing time, it might be the case that
retractions are arriving after the state has been cleaned up. Before these
changes, a new accumulator was created and invalid retraction messages were
emitted. This change drops retraction messages for which no accumulator exists.
It also refactors the tests and adds more test cases and explanations.
## Brief change log
- Fix of `org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction`
- Improvements to the harness tests
## Verifying this change
Additional harness tests and one ITCase for nested distinct aggregates.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> DistinctAccumulator.remove lead to NPE
> --------------------------------------
>
> Key: FLINK-10674
> URL: https://issues.apache.org/jira/browse/FLINK-10674
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.6.1
> Environment: Flink 1.6.0
> Reporter: ambition
> Assignee: winifredtang
> Priority: Minor
> Labels: pull-request-available
> Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select `time`,
> lower(trim(os_type)) as os_type,
> count(distinct feed_id) as feed_total_view
> from my_table
> group by `time`, lower(trim(os_type)){code}
>
> then occur NPE:
>
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>
>
> View DistinctAccumulator.remove
> !image-2018-10-25-14-46-03-373.png!
>
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
> if(!distinctValueMap.contains(params)){
> true
> }else{
> val currentCnt = distinctValueMap.get(params)
> //
> if (currentCnt == null || currentCnt == 1) {
> distinctValueMap.remove(params)
> true
> } else {
> var value = currentCnt - 1L
> if(value < 0){
> value = 1
> }
> distinctValueMap.put(params, value)
> false
> }
> }
> }{code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)