Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3735#discussion_r111992686
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
---
@@ -21,44 +21,48 @@ import java.lang.Iterable
import org.apache.flink.api.common.functions.RichGroupReduceFunction
import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler,
GeneratedAggregationsFunction}
import org.apache.flink.types.Row
import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.LoggerFactory
/**
* [[RichGroupReduceFunction]] to compute aggregates that do not support
pre-aggregation for batch
* (DataSet) queries.
*
- * @param aggregates The aggregate functions.
- * @param aggInFields The positions of the aggregation input fields.
+ * @param genAggregations Code-generated [[GeneratedAggregations]]
* @param gkeyOutMapping The mapping of group keys between input and
output positions.
- * @param aggOutMapping The mapping of aggregates to output positions.
* @param groupingSetsMapping The mapping of grouping set keys between
input and output positions.
- * @param finalRowArity The arity of the final resulting row.
*/
class DataSetAggFunction(
- private val aggregates: Array[AggregateFunction[_ <: Any]],
- private val aggInFields: Array[Array[Int]],
- private val aggOutMapping: Array[(Int, Int)],
+ private val genAggregations: GeneratedAggregationsFunction,
private val gkeyOutMapping: Array[(Int, Int)],
--- End diff --
It would be good if we could parameterize the method that generates the
code such that we can do the grouping keys and grouping set copies with
`GeneratedAggregations.setForwardFields()`. This should be possible as it is
actually just setting constant boolean flags at certain positions in the output
Row.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---