[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141208#comment-16141208 ]
ASF GitHub Bot commented on FLINK-7206: --------------------------------------- Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135184770 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** + * CountDistinct accumulator. + */ + public static class CountDistinctAccum { + public MapView<String, Integer> map; + public long count; + } + + /** + * CountDistinct aggregate. + */ + public static class CountDistinct extends AggregateFunction<Long, CountDistinctAccum> { + + @Override + public CountDistinctAccum createAccumulator() { + CountDistinctAccum accum = new CountDistinctAccum(); + accum.map = new MapView<>(Types.STRING, Types.INT); + accum.count = 0L; + return accum; + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, String id) { + try { + if (!accumulator.map.contains(id)) { + accumulator.map.put(id, 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.put(String.valueOf(id), 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Long getValue(CountDistinctAccum accumulator) { + return accumulator.count; + } + } + + /** + * CountDistinct aggregate with merge. + */ + public static class CountDistinctWithMerge extends CountDistinct { + + //Overloaded merge method + public void merge(CountDistinctAccum acc, Iterable<CountDistinctAccum> it) { + Iterator<CountDistinctAccum> iter = it.iterator(); + while (iter.hasNext()) { + CountDistinctAccum mergeAcc = iter.next(); + acc.count += mergeAcc.count; + + try { + Iterator<String> mapItr = mergeAcc.map.keys().iterator(); + while (mapItr.hasNext()) { + String key = mapItr.next(); + if (!acc.map.contains(key)) { + acc.map.put(key, 1); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + /** + * CountDistinct aggregate with merge and reset. + */ + public static class CountDistinctWithMergeAndReset extends CountDistinctWithMerge { + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); + acc.count = 0; + } + } + + /** + * CountDistinct aggregate with retract. + */ + public static class CountDistinctWithRetractAndReset extends CountDistinct { + + //Overloaded retract method + public void retract(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.remove(String.valueOf(id)); --- End diff -- The code here is the opposite. It should be: ``` if (accumulator.map.contains(String.valueOf(id))) { accumulator.count -= 1; accumulator.map.remove(String.valueOf(id)); } ``` One record come, the map will put (record, 1) and count+=1; When do rectract, the count-=1 and map.remove(record, 1); > Implementation of DataView to support state access for UDAGG > ------------------------------------------------------------ > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Kaibo Zhou > Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)