twalthr commented on a change in pull request #18152:
URL: https://github.com/apache/flink/pull/18152#discussion_r772170828
##########
File path:
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
##########
@@ -169,35 +174,61 @@ public AbstractArrowPythonAggregateFunctionOperator
getTestOperator(
RowType outputType,
int[] groupingSet,
int[] udafInputOffsets) {
+ RowType userDefinedFunctionInputType =
+ (RowType) Projection.of(udafInputOffsets).project(inputType);
+ RowType userDefinedFunctionOutputType =
+ new RowType(
+ outputType
+ .getFields()
+ .subList(groupingSet.length,
outputType.getFieldCount()));
Review comment:
maybe another useful constructor function for `Projection` like
`Projection.range()` or so?
##########
File path:
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
##########
@@ -216,57 +221,85 @@ public RowType getOutputType() {
public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
Configuration config,
PythonFunctionInfo[] pandasAggregateFunctions,
- RowType inputType,
- RowType outputType,
+ RowType inputRowType,
+ RowType outputRowType,
int[] groupingSet,
int[] udafInputOffsets) {
+ RowType userDefinedFunctionInputType =
+ (RowType)
Projection.of(udafInputOffsets).project(inputRowType);
+ RowType userDefinedFunctionOutputType =
+ new RowType(
+ outputRowType
+ .getFields()
+ .subList(
+ inputRowType.getFieldCount(),
+ outputRowType.getFieldCount()));
+
return new
PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator(
config,
pandasAggregateFunctions,
- inputType,
- outputType,
+ inputRowType,
+ userDefinedFunctionInputType,
+ userDefinedFunctionOutputType,
new long[] {0L, Long.MIN_VALUE},
new long[] {0L, 2L},
new boolean[] {true, false},
new int[] {0},
- groupingSet,
- groupingSet,
- udafInputOffsets,
3,
- true);
+ true,
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(new TableConfig()),
+ "UdafInputProjection",
+ inputRowType,
+ userDefinedFunctionInputType,
+ udafInputOffsets),
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(new TableConfig()),
+ "GroupKey",
+ inputRowType,
+ (RowType)
Projection.of(groupingSet).project(inputRowType),
+ groupingSet),
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(new TableConfig()),
+ "GroupSet",
+ inputRowType,
+ (RowType)
Projection.of(groupingSet).project(inputRowType),
+ groupingSet));
}
private static class
PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator
extends BatchArrowPythonOverWindowAggregateFunctionOperator {
- PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator(
+ public PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator(
Configuration config,
PythonFunctionInfo[] pandasAggFunctions,
RowType inputType,
- RowType outputType,
+ RowType userDefinedFunctionInputType,
+ RowType userDefinedFunctionOutputType,
long[] lowerBoundary,
long[] upperBoundary,
- boolean[] isRangeWindow,
+ boolean[] isRangeWindows,
int[] aggWindowIndex,
- int[] groupKey,
- int[] groupingSet,
- int[] udafInputOffsets,
int inputTimeFieldIndex,
- boolean asc) {
+ boolean asc,
+ GeneratedProjection inputGeneratedProjection,
+ GeneratedProjection groupKeyGeneratedProjection,
+ GeneratedProjection groupSetGeneratedProjection) {
super(
config,
pandasAggFunctions,
inputType,
- outputType,
+ userDefinedFunctionInputType,
+ userDefinedFunctionOutputType,
lowerBoundary,
upperBoundary,
- isRangeWindow,
+ isRangeWindows,
Review comment:
typo
##########
File path:
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
##########
@@ -317,54 +322,82 @@ public RowType getOutputType() {
public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
Configuration config,
PythonFunctionInfo[] pandasAggregateFunctions,
- RowType inputType,
- RowType outputType,
+ RowType inputRowType,
+ RowType outputRowType,
int[] groupingSet,
int[] udafInputOffsets) {
+
+ RowType userDefinedFunctionInputType =
+ (RowType)
Projection.of(udafInputOffsets).project(inputRowType);
+ RowType userDefinedFunctionOutputType =
+ new RowType(
+ outputRowType
+ .getFields()
+ .subList(groupingSet.length,
outputRowType.getFieldCount() - 2));
+
// SlidingWindow(10000L, 5000L)
return new
PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator(
config,
pandasAggregateFunctions,
- inputType,
- outputType,
+ inputRowType,
+ userDefinedFunctionInputType,
+ userDefinedFunctionOutputType,
3,
100000,
10000L,
5000L,
new int[] {0, 1},
- groupingSet,
- groupingSet,
- udafInputOffsets);
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(new TableConfig()),
+ "UdafInputProjection",
+ inputRowType,
+ userDefinedFunctionInputType,
+ udafInputOffsets),
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(new TableConfig()),
+ "GroupKey",
+ inputRowType,
+ (RowType)
Projection.of(groupingSet).project(inputRowType),
+ groupingSet),
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(new TableConfig()),
+ "GroupSet",
+ inputRowType,
+ (RowType)
Projection.of(groupingSet).project(inputRowType),
+ groupingSet));
}
private static class
PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator
extends BatchArrowPythonGroupWindowAggregateFunctionOperator {
- PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator(
+
+ public PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator(
Review comment:
why `public`?
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -53,17 +51,11 @@
/** The input logical type. */
protected final RowType inputType;
- /** The output logical type. */
- protected final RowType outputType;
-
- /** The offsets of user-defined function inputs. */
- protected final int[] userDefinedFunctionInputOffsets;
-
/** The user-defined function input logical type. */
- protected transient RowType userDefinedFunctionInputType;
+ protected final RowType userDefinedFunctionInputType;
/** The user-defined function output logical type. */
- protected transient RowType userDefinedFunctionOutputType;
+ protected final RowType userDefinedFunctionOutputType;
Review comment:
very nit: can we shorten the names, it is difficult to read at various
locations
```
userDefinedFunctionOutputType -> udfOutputType
userDefinedFunctionInputType -> udfInputType
udafInputGeneratedProjection -> udafInputProjection
groupKeyGeneratedProjection -> groupKeyProjection
userDefinedFunctionInputOffsets -> udfInputOffsets
```
##########
File path:
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
##########
@@ -109,38 +114,53 @@ public AbstractArrowPythonAggregateFunctionOperator
getTestOperator(
RowType outputType,
int[] groupingSet,
int[] udafInputOffsets) {
+ RowType userDefinedFunctionInputType =
+ (RowType) Projection.of(udafInputOffsets).project(inputType);
+ RowType userDefinedFunctionOutputType =
+ new RowType(
+ outputType
+ .getFields()
+ .subList(inputType.getFieldCount(),
outputType.getFieldCount()));
+ GeneratedProjection generatedProjection =
Review comment:
nit: inline this into the parameter for consistency? currently the PR
does some with a local variable and most of them inline as a parameter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]