Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5555#discussion_r184419816
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
---
@@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var
mapView: MapView[E, JLon
override def equals(that: Any): Boolean =
that match {
case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
- this.mapView == that.mapView
+ this.distinctValueMap == that.distinctValueMap
case _ => false
}
def add(element: E): Boolean = {
- if (element != null) {
- val currentVal = mapView.get(element)
- if (currentVal != null) {
- mapView.put(element, currentVal + 1L)
- false
- } else {
- mapView.put(element, 1L)
- true
- }
- } else {
+ val wrappedElement = Row.of(element)
--- End diff --
I think we should remove the `E` type parameter and directly pass the `Row`
as an argument. That will also make the extension to multiple arguments very
easy.
Actually, I think I'll do that before merging
---