gustavodemorais commented on code in PR #26113:
URL: https://github.com/apache/flink/pull/26113#discussion_r1956017956
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/UnnestRowsFunction.java:
##########
@@ -19,169 +19,89 @@
package org.apache.flink.table.runtime.functions.table;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.UserDefinedFunction;
-import org.apache.flink.table.runtime.functions.BuiltInSpecializedFunction;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.MapType;
-import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
/**
* Flattens ARRAY, MAP, and MULTISET using a table function. It does this by
another level of
* specialization using a subclass of {@link UnnestTableFunctionBase}.
*/
@Internal
-public class UnnestRowsFunction extends BuiltInSpecializedFunction {
+public class UnnestRowsFunction extends UnnestRowsFunctionBase {
public UnnestRowsFunction() {
- super(BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
+ super();
}
@Override
- public UserDefinedFunction specialize(SpecializedContext context) {
- final LogicalType argType =
-
context.getCallContext().getArgumentDataTypes().get(0).getLogicalType();
- switch (argType.getTypeRoot()) {
- case ARRAY:
- final ArrayType arrayType = (ArrayType) argType;
- return new CollectionUnnestTableFunction(
- context,
- arrayType.getElementType(),
-
ArrayData.createElementGetter(arrayType.getElementType()));
- case MULTISET:
- final MultisetType multisetType = (MultisetType) argType;
- return new CollectionUnnestTableFunction(
- context,
- multisetType.getElementType(),
-
ArrayData.createElementGetter(multisetType.getElementType()));
- case MAP:
- final MapType mapType = (MapType) argType;
- return new MapUnnestTableFunction(
- context,
- RowType.of(false, mapType.getKeyType(),
mapType.getValueType()),
- ArrayData.createElementGetter(mapType.getKeyType()),
- ArrayData.createElementGetter(mapType.getValueType()));
- default:
- throw new UnsupportedOperationException("Unsupported type for
UNNEST: " + argType);
- }
- }
-
- public static LogicalType getUnnestedType(LogicalType logicalType) {
- switch (logicalType.getTypeRoot()) {
- case ARRAY:
- return ((ArrayType) logicalType).getElementType();
- case MULTISET:
- return ((MultisetType) logicalType).getElementType();
- case MAP:
- final MapType mapType = (MapType) logicalType;
- return RowType.of(false, mapType.getKeyType(),
mapType.getValueType());
- default:
- throw new UnsupportedOperationException("Unsupported UNNEST
type: " + logicalType);
- }
+ protected UserDefinedFunction createCollectionUnnestFunction(
+ SpecializedContext context,
+ LogicalType elementType,
+ ArrayData.ElementGetter elementGetter) {
+ return new CollectionUnnestFunction(context, elementType,
elementGetter);
}
- //
--------------------------------------------------------------------------------------------
- // Runtime Implementation
- //
--------------------------------------------------------------------------------------------
-
- private abstract static class UnnestTableFunctionBase extends
BuiltInTableFunction<Object> {
-
- private final transient DataType outputDataType;
-
- UnnestTableFunctionBase(SpecializedContext context, LogicalType
outputType) {
- super(BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS, context);
- // The output type in the context is already wrapped, however, the
result of the
- // function is not. Therefore, we need a custom output type.
- outputDataType = DataTypes.of(outputType).toInternal();
- }
-
- @Override
- public DataType getOutputDataType() {
- return outputDataType;
- }
+ @Override
+ protected UserDefinedFunction createMapUnnestFunction(
+ SpecializedContext context,
+ RowType keyValTypes,
+ ArrayData.ElementGetter keyGetter,
+ ArrayData.ElementGetter valueGetter) {
+ return new MapUnnestFunction(context, keyValTypes, keyGetter,
valueGetter);
}
/** Table function that unwraps the elements of a collection (array or
multiset). */
- public static final class CollectionUnnestTableFunction extends
UnnestTableFunctionBase {
+ public static final class CollectionUnnestFunction extends
UnnestTableFunctionBase {
private static final long serialVersionUID = 1L;
private final ArrayData.ElementGetter elementGetter;
- public CollectionUnnestTableFunction(
+ public CollectionUnnestFunction(
SpecializedContext context,
- LogicalType outputType,
+ LogicalType elementType,
ArrayData.ElementGetter elementGetter) {
- super(context, outputType);
+ super(context, elementType);
this.elementGetter = elementGetter;
}
public void eval(ArrayData arrayData) {
- if (arrayData == null) {
- return;
- }
- final int size = arrayData.size();
- for (int pos = 0; pos < size; pos++) {
- collect(elementGetter.getElementOrNull(arrayData, pos));
- }
+ evalArrayData(arrayData, elementGetter, (element, position) ->
collect(element));
}
public void eval(MapData mapData) {
- if (mapData == null) {
- return;
- }
- final int size = mapData.size();
- final ArrayData keys = mapData.keyArray();
- final ArrayData values = mapData.valueArray();
- for (int pos = 0; pos < size; pos++) {
- final int multiplier = values.getInt(pos);
- final Object key = elementGetter.getElementOrNull(keys, pos);
- for (int i = 0; i < multiplier; i++) {
- collect(key);
- }
- }
+ evalMultisetData(mapData, elementGetter, (element, position) ->
collect(element));
}
}
/** Table function that unwraps the elements of a map. */
- public static final class MapUnnestTableFunction extends
UnnestTableFunctionBase {
+ public static final class MapUnnestFunction extends
UnnestTableFunctionBase {
private static final long serialVersionUID = 1L;
private final ArrayData.ElementGetter keyGetter;
-
private final ArrayData.ElementGetter valueGetter;
- public MapUnnestTableFunction(
+ public MapUnnestFunction(
SpecializedContext context,
- LogicalType outputType,
+ LogicalType keyValTypes,
Review Comment:
These are element types we receive as an input. They might or might not be
the same as the output type. For with ordinality we have:
keyValTypes: `row(key, val)`
outputType: `row(key, val, ordinality)`
Since it's still not yet the output type, and the outputType can actually
differ, I renamed the params to `elementType` for CollectionUnnest and
`keyValTypes` for MapUnnest for both classes. IMO was clearer and closer to
reality since. Wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]