[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141208#comment-16141208
 ] 

ASF GitHub Bot commented on FLINK-7206:
---------------------------------------

Github user kaibozhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r135184770
  
    --- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
    @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
                        accumulator.count -= iWeight;
                }
        }
    +
    +   /**
    +    * CountDistinct accumulator.
    +    */
    +   public static class CountDistinctAccum {
    +           public MapView<String, Integer> map;
    +           public long count;
    +   }
    +
    +   /**
    +    * CountDistinct aggregate.
    +    */
    +   public static class CountDistinct extends AggregateFunction<Long, 
CountDistinctAccum> {
    +
    +           @Override
    +           public CountDistinctAccum createAccumulator() {
    +                   CountDistinctAccum accum = new CountDistinctAccum();
    +                   accum.map = new MapView<>(Types.STRING, Types.INT);
    +                   accum.count = 0L;
    +                   return accum;
    +           }
    +
    +           //Overloaded accumulate method
    +           public void accumulate(CountDistinctAccum accumulator, String 
id) {
    +                   try {
    +                           if (!accumulator.map.contains(id)) {
    +                                   accumulator.map.put(id, 1);
    +                                   accumulator.count += 1;
    +                           }
    +                   } catch (Exception e) {
    +                           e.printStackTrace();
    +                   }
    +           }
    +
    +           //Overloaded accumulate method
    +           public void accumulate(CountDistinctAccum accumulator, long id) 
{
    +                   try {
    +                           if 
(!accumulator.map.contains(String.valueOf(id))) {
    +                                   accumulator.map.put(String.valueOf(id), 
1);
    +                                   accumulator.count += 1;
    +                           }
    +                   } catch (Exception e) {
    +                           e.printStackTrace();
    +                   }
    +           }
    +
    +           @Override
    +           public Long getValue(CountDistinctAccum accumulator) {
    +                   return accumulator.count;
    +           }
    +   }
    +
    +   /**
    +    * CountDistinct aggregate with merge.
    +    */
    +   public static class CountDistinctWithMerge extends CountDistinct {
    +
    +           //Overloaded merge method
    +           public void merge(CountDistinctAccum acc, 
Iterable<CountDistinctAccum> it) {
    +                   Iterator<CountDistinctAccum> iter = it.iterator();
    +                   while (iter.hasNext()) {
    +                           CountDistinctAccum mergeAcc = iter.next();
    +                           acc.count += mergeAcc.count;
    +
    +                           try {
    +                                   Iterator<String> mapItr = 
mergeAcc.map.keys().iterator();
    +                                   while (mapItr.hasNext()) {
    +                                           String key = mapItr.next();
    +                                           if (!acc.map.contains(key)) {
    +                                                   acc.map.put(key, 1);
    +                                           }
    +                                   }
    +                           } catch (Exception e) {
    +                                   e.printStackTrace();
    +                           }
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * CountDistinct aggregate with merge and reset.
    +    */
    +   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
    +
    +           //Overloaded retract method
    +           public void resetAccumulator(CountDistinctAccum acc) {
    +                   acc.map.clear();
    +                   acc.count = 0;
    +           }
    +   }
    +
    +   /**
    +    * CountDistinct aggregate with retract.
    +    */
    +   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
    +
    +           //Overloaded retract method
    +           public void retract(CountDistinctAccum accumulator, long id) {
    +                   try {
    +                           if 
(!accumulator.map.contains(String.valueOf(id))) {
    +                                   
accumulator.map.remove(String.valueOf(id));
    --- End diff --
    
    The code here is the opposite. It should be:
    ```
                                if 
(accumulator.map.contains(String.valueOf(id))) {
                                        accumulator.count -= 1;
                                        
accumulator.map.remove(String.valueOf(id));
                                }
    ```
    
    One record come, the map will put (record, 1) and count+=1; 
    When do rectract, the count-=1 and map.remove(record, 1); 


> Implementation of DataView to support state access for UDAGG
> ------------------------------------------------------------
>
>                 Key: FLINK-7206
>                 URL: https://issues.apache.org/jira/browse/FLINK-7206
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to