gustavodemorais commented on code in PR #26113:
URL: https://github.com/apache/flink/pull/26113#discussion_r1956017956


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/UnnestRowsFunction.java:
##########
@@ -19,169 +19,89 @@
 package org.apache.flink.table.runtime.functions.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.UserDefinedFunction;
-import org.apache.flink.table.runtime.functions.BuiltInSpecializedFunction;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.MapType;
-import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 
 /**
  * Flattens ARRAY, MAP, and MULTISET using a table function. It does this by 
another level of
  * specialization using a subclass of {@link UnnestTableFunctionBase}.
  */
 @Internal
-public class UnnestRowsFunction extends BuiltInSpecializedFunction {
+public class UnnestRowsFunction extends UnnestRowsFunctionBase {
 
     public UnnestRowsFunction() {
-        super(BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
+        super();
     }
 
     @Override
-    public UserDefinedFunction specialize(SpecializedContext context) {
-        final LogicalType argType =
-                
context.getCallContext().getArgumentDataTypes().get(0).getLogicalType();
-        switch (argType.getTypeRoot()) {
-            case ARRAY:
-                final ArrayType arrayType = (ArrayType) argType;
-                return new CollectionUnnestTableFunction(
-                        context,
-                        arrayType.getElementType(),
-                        
ArrayData.createElementGetter(arrayType.getElementType()));
-            case MULTISET:
-                final MultisetType multisetType = (MultisetType) argType;
-                return new CollectionUnnestTableFunction(
-                        context,
-                        multisetType.getElementType(),
-                        
ArrayData.createElementGetter(multisetType.getElementType()));
-            case MAP:
-                final MapType mapType = (MapType) argType;
-                return new MapUnnestTableFunction(
-                        context,
-                        RowType.of(false, mapType.getKeyType(), 
mapType.getValueType()),
-                        ArrayData.createElementGetter(mapType.getKeyType()),
-                        ArrayData.createElementGetter(mapType.getValueType()));
-            default:
-                throw new UnsupportedOperationException("Unsupported type for 
UNNEST: " + argType);
-        }
-    }
-
-    public static LogicalType getUnnestedType(LogicalType logicalType) {
-        switch (logicalType.getTypeRoot()) {
-            case ARRAY:
-                return ((ArrayType) logicalType).getElementType();
-            case MULTISET:
-                return ((MultisetType) logicalType).getElementType();
-            case MAP:
-                final MapType mapType = (MapType) logicalType;
-                return RowType.of(false, mapType.getKeyType(), 
mapType.getValueType());
-            default:
-                throw new UnsupportedOperationException("Unsupported UNNEST 
type: " + logicalType);
-        }
+    protected UserDefinedFunction createCollectionUnnestFunction(
+            SpecializedContext context,
+            LogicalType elementType,
+            ArrayData.ElementGetter elementGetter) {
+        return new CollectionUnnestFunction(context, elementType, 
elementGetter);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    // Runtime Implementation
-    // 
--------------------------------------------------------------------------------------------
-
-    private abstract static class UnnestTableFunctionBase extends 
BuiltInTableFunction<Object> {
-
-        private final transient DataType outputDataType;
-
-        UnnestTableFunctionBase(SpecializedContext context, LogicalType 
outputType) {
-            super(BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS, context);
-            // The output type in the context is already wrapped, however, the 
result of the
-            // function is not. Therefore, we need a custom output type.
-            outputDataType = DataTypes.of(outputType).toInternal();
-        }
-
-        @Override
-        public DataType getOutputDataType() {
-            return outputDataType;
-        }
+    @Override
+    protected UserDefinedFunction createMapUnnestFunction(
+            SpecializedContext context,
+            RowType keyValTypes,
+            ArrayData.ElementGetter keyGetter,
+            ArrayData.ElementGetter valueGetter) {
+        return new MapUnnestFunction(context, keyValTypes, keyGetter, 
valueGetter);
     }
 
     /** Table function that unwraps the elements of a collection (array or 
multiset). */
-    public static final class CollectionUnnestTableFunction extends 
UnnestTableFunctionBase {
+    public static final class CollectionUnnestFunction extends 
UnnestTableFunctionBase {
 
         private static final long serialVersionUID = 1L;
 
         private final ArrayData.ElementGetter elementGetter;
 
-        public CollectionUnnestTableFunction(
+        public CollectionUnnestFunction(
                 SpecializedContext context,
-                LogicalType outputType,
+                LogicalType elementType,
                 ArrayData.ElementGetter elementGetter) {
-            super(context, outputType);
+            super(context, elementType);
             this.elementGetter = elementGetter;
         }
 
         public void eval(ArrayData arrayData) {
-            if (arrayData == null) {
-                return;
-            }
-            final int size = arrayData.size();
-            for (int pos = 0; pos < size; pos++) {
-                collect(elementGetter.getElementOrNull(arrayData, pos));
-            }
+            evalArrayData(arrayData, elementGetter, (element, position) -> 
collect(element));
         }
 
         public void eval(MapData mapData) {
-            if (mapData == null) {
-                return;
-            }
-            final int size = mapData.size();
-            final ArrayData keys = mapData.keyArray();
-            final ArrayData values = mapData.valueArray();
-            for (int pos = 0; pos < size; pos++) {
-                final int multiplier = values.getInt(pos);
-                final Object key = elementGetter.getElementOrNull(keys, pos);
-                for (int i = 0; i < multiplier; i++) {
-                    collect(key);
-                }
-            }
+            evalMultisetData(mapData, elementGetter, (element, position) -> 
collect(element));
         }
     }
 
     /** Table function that unwraps the elements of a map. */
-    public static final class MapUnnestTableFunction extends 
UnnestTableFunctionBase {
+    public static final class MapUnnestFunction extends 
UnnestTableFunctionBase {
 
         private static final long serialVersionUID = 1L;
 
         private final ArrayData.ElementGetter keyGetter;
-
         private final ArrayData.ElementGetter valueGetter;
 
-        public MapUnnestTableFunction(
+        public MapUnnestFunction(
                 SpecializedContext context,
-                LogicalType outputType,
+                LogicalType keyValTypes,

Review Comment:
   These are element types we receive as an input. They might or might not be 
the same as the output type. For with ordinality we have:
   
   keyValTypes: `row(key, val)`
   outputType: `row(key, val, ordinality)`
   
   Since it's still not yet the output type, and the outputType can actually 
differ, I renamed the params to `elementType` for CollectionUnnest and 
`keyValTypes` for MapUnnest for both classes. IMO was clearer and closer to 
reality since. Wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to