cshuo commented on a change in pull request #17634:
URL: https://github.com/apache/flink/pull/17634#discussion_r744367549
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
##########
@@ -834,6 +834,11 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
case BuiltInFunctionDefinitions.JSON_STRING =>
new JsonStringCallGen(call).generate(ctx, operands, resultType)
+ case BuiltInFunctionDefinitions.PLUS_AGG =>
+ val left = operands.head
+ val right = operands(1)
Review comment:
add some check for operands? e.g., requireDecimal(..)
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
##########
@@ -759,6 +760,24 @@
explicit(DataTypes.STRING()))))
.build();
+ /**
+ * Special "+" operator used internally by {@code SumAggFunction} to
implement SUM aggregation
+ * on a Decimal type. Uses the {@link
LogicalTypeMerging#findSumAggType(LogicalType)} to avoid
+ * the normal {@link #PLUS} override the special calculation for precision
and scale needed by
+ * SUM.
+ */
+ public static final BuiltInFunctionDefinition PLUS_AGG =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("PLUS_AGG")
Review comment:
`PLUS_AGG` looks like is an aggregate function..Maybe
`AGG_PLUS`、`PLUS_FOR_AGG`..
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SumAggFunction.java
##########
@@ -149,15 +161,24 @@ public DataType getResultType() {
/** Built-in Decimal Sum aggregate function. */
public static class DecimalSumAggFunction extends SumAggFunction {
private DecimalType decimalType;
+ private DataType returnType;
public DecimalSumAggFunction(DecimalType decimalType) {
this.decimalType = decimalType;
}
@Override
public DataType getResultType() {
- DecimalType sumType = (DecimalType)
LogicalTypeMerging.findSumAggType(decimalType);
- return DataTypes.DECIMAL(sumType.getPrecision(),
sumType.getScale());
+ if (returnType == null) {
+ DecimalType sumType = (DecimalType)
LogicalTypeMerging.findSumAggType(decimalType);
+ returnType = DataTypes.DECIMAL(sumType.getPrecision(),
sumType.getScale());
+ }
+ return returnType;
+ }
+
Review comment:
`@Override`
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SumAggFunction.java
##########
@@ -66,7 +68,10 @@ public int operandCount() {
ifThenElse(
Review comment:
`adjustSumType` is not needed any more.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]