twalthr commented on code in PR #26547: URL: https://github.com/apache/flink/pull/26547#discussion_r2200555209
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LiteralAggFunction.java: ########## @@ -40,8 +41,11 @@ public abstract class LiteralAggFunction extends DeclarativeAggregateFunction { private final UnresolvedReferenceExpression literalAgg = unresolvedRef("literalAgg"); private static final Expression[] EMPTY_EXPRS = new Expression[0]; private static final DataType[] EMPTY_DATATYPES = new DataType[0]; + protected final RexLiteral rexLiteral; Review Comment: can be private? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala: ########## @@ -285,22 +285,24 @@ class AggFunctionFactory( } } - private def createLiteralAggFunction(relDataType: RelDataType): UserDefinedFunction = { + private def createLiteralAggFunction( + relDataType: RelDataType, + rexNode: RexNode): UserDefinedFunction = { relDataType.getSqlTypeName match { case SqlTypeName.BOOLEAN => - BooleanLiteralAggFunction.INSTANCE + new BooleanLiteralAggFunction(rexNode.asInstanceOf[RexLiteral]) Review Comment: Let's just have one LiteralAggFunction class that takes the RexLiteral. The result type can be derived from the literal. Then we also don't need this type checking here. Simplifies the code significantly. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala: ########## @@ -84,7 +90,15 @@ object AggCodeGenHelper { .map(index => Array(inputType.getTypeAt(index))) val aggTypes = aggInfos - .map(_.externalAccTypes.map(fromDataTypeToLogicalType)) + .map({ + a => + a.function match { + case f: LiteralAggFunction => + Array(fromDataTypeToLogicalType(f.getResultType)) Review Comment: Don't fully understand this logic. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LiteralAggFunction.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.aggfunctions; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.functions.DeclarativeAggregateFunction; +import org.apache.flink.table.types.DataType; + +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.sql.fun.SqlLiteralAggFunction; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; + +/** + * Built-in literal aggregate function. This function is used for internal optimizations. It accepts + * zero regular aggregate arguments and returns a constant value. For more details see <a + * href="https://issues.apache.org/jira/browse/CALCITE-4334">CALCITE-4334</a> and {@link + * SqlLiteralAggFunction}. + */ +public abstract class LiteralAggFunction extends DeclarativeAggregateFunction { + + private final UnresolvedReferenceExpression literalAgg = unresolvedRef("literalAgg"); + private final RexLiteral rexLiteral; + + public LiteralAggFunction(RexLiteral rexLiteral) { + this.rexLiteral = rexLiteral; + } + + @Override + public int operandCount() { + return 0; + } + + @Override + public UnresolvedReferenceExpression[] aggBufferAttributes() { + return new UnresolvedReferenceExpression[] {literalAgg}; + } + + @Override + public DataType[] getAggBufferTypes() { + return new DataType[] {getResultType()}; + } + + @Override + public Expression[] initialValuesExpressions() { + return new Expression[] {nullOf(getResultType())}; + } + + @Override + public Expression[] accumulateExpressions() { + return new Expression[] {literal(rexLiteral.getValue(), getResultType())}; + } + + @Override + public Expression[] retractExpressions() { + return new Expression[] {literal(rexLiteral.getValue(), getResultType())}; + } + + @Override + public Expression[] mergeExpressions() { + return new Expression[] {literal(rexLiteral.getValue(), getResultType())}; + } + + @Override + public Expression getValueExpression() { + return literal(rexLiteral.getValue(), getResultType()); + } + + /** Built-in Boolean Literal aggregate function. */ + public static class BooleanLiteralAggFunction extends LiteralAggFunction { + + public BooleanLiteralAggFunction(RexLiteral rexLiteral) { + super(rexLiteral); + } + + @Override + public DataType getResultType() { + return DataTypes.BOOLEAN(); + } + } + + /** Built-in Byte Literal aggregate function. */ + public static class ByteLiteralAggFunction extends LiteralAggFunction { + + public ByteLiteralAggFunction(RexLiteral rexLiteral) { + super(rexLiteral); + } + + @Override + public DataType getResultType() { + return DataTypes.TINYINT(); + } + } + + /** Built-in Short Literal aggregate function. */ + public static class ShortLiteralAggFunction extends LiteralAggFunction { + + public ShortLiteralAggFunction(RexLiteral rexLiteral) { + super(rexLiteral); + } + + @Override + public DataType getResultType() { + return DataTypes.SMALLINT(); + } + } + + /** Built-in Long Literal aggregate function. */ + public static class LongLiteralAggFunction extends LiteralAggFunction { Review Comment: If it is just this code location that causes all the literal agg trouble. Can't we checkout the old version. Merge this PR first and then work on the literal agg as a followup PR? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java: ########## @@ -85,6 +85,7 @@ public AggregateCall deserialize(JsonParser jsonParser, DeserializationContext c distinct, approximate, ignoreNulls, + new ArrayList<>(), Review Comment: So this list is not important for Flink? Are we sure we are not loosing any data? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org