liuyongvs commented on code in PR #22030:
URL: https://github.com/apache/flink/pull/22030#discussion_r1118532982


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##########
@@ -19,46 +19,88 @@
 package org.apache.flink.table.runtime.functions.scalar;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.SpecializedFunction;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nullable;
 
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
 
 /** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
 @Internal
 public class ArrayDistinctFunction extends BuiltInScalarFunction {
     private final ArrayData.ElementGetter elementGetter;
+    private final SpecializedFunction.ExpressionEvaluator equalityEvaluator;
+    private transient MethodHandle equalityHandle;
 
     public ArrayDistinctFunction(SpecializedFunction.SpecializedContext 
context) {
         super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
         final DataType dataType =
                 ((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
                         .getElementDataType();
         elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+        equalityEvaluator =
+                context.createEvaluator(
+                        $("element1").isEqual($("element2")),
+                        DataTypes.BOOLEAN(),
+                        DataTypes.FIELD("element1", 
dataType.notNull().toInternal()),
+                        DataTypes.FIELD("element2", 
dataType.notNull().toInternal()));
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        equalityHandle = equalityEvaluator.open(context);
     }
 
     public @Nullable ArrayData eval(ArrayData haystack) {
         try {
             if (haystack == null) {
                 return null;
             }
-            Set set = new LinkedHashSet<>();
-            final int size = haystack.size();
-            for (int pos = 0; pos < size; pos++) {
-                final Object element = 
elementGetter.getElementOrNull(haystack, pos);
-                set.add(element);
+
+            List list = new ArrayList();
+            boolean alreadyStoredNull = false;
+
+            for (int i = 0; i < haystack.size(); i++) {
+                final Object element1 = 
elementGetter.getElementOrNull(haystack, i);
+                if (element1 != null) {
+                    boolean found = false;
+                    for (int j = 0; !found && j < list.size(); j++) {
+                        Object element2 = list.get(j);
+                        if (element2 != null) {
+                            found = (boolean) equalityHandle.invoke(element1, 
element2);
+                        }
+                    }
+                    if (!found) {
+                        list.add(element1);
+                    }
+                } else {
+                    // De-duplicate the null values.
+                    if (!alreadyStoredNull) {
+                        list.add(element1);
+                        alreadyStoredNull = true;
+                    }
+                }

Review Comment:
   @snuyanzin i think it may not collect is a agg function. I thought about 
that, like distinct, which is also agg function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to