[
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973424#comment-15973424
]
ASF GitHub Bot commented on FLINK-6242:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3735#discussion_r112002408
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
---
@@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions}
* it does no final aggregate evaluation. It also includes the logic of
* [[DataSetSlideTimeWindowAggFlatMapFunction]].
*
- * @param aggregates aggregate functions
- * @param groupingKeysLength number of grouping keys
- * @param timeFieldPos position of aligned time field
+ * @param genAggregations Code-generated [[GeneratedAggregations]]
+ * @param keysAndAggregatesArity The total arity of keys and aggregates
* @param windowSize window size of the sliding window
* @param windowSlide window slide of the sliding window
* @param returnType return type of this function
*/
class DataSetSlideTimeWindowAggReduceGroupFunction(
- private val aggregates: Array[AggregateFunction[_ <: Any]],
- private val groupingKeysLength: Int,
- private val timeFieldPos: Int,
+ private val genAggregations: GeneratedAggregationsFunction,
+ private val keysAndAggregatesArity: Int,
private val windowSize: Long,
private val windowSlide: Long,
@transient private val returnType: TypeInformation[Row])
extends RichGroupReduceFunction[Row, Row]
with CombineFunction[Row, Row]
- with ResultTypeQueryable[Row] {
+ with ResultTypeQueryable[Row]
+ with Compiler[GeneratedAggregations] {
- Preconditions.checkNotNull(aggregates)
+ private val timeFieldPos = returnType.getArity - 1
+ private val intermediateWindowStartPos = keysAndAggregatesArity
protected var intermediateRow: Row = _
- // add one field to store window start
- protected val intermediateRowArity: Int = groupingKeysLength +
aggregates.length + 1
- protected val accumulatorList: Array[JArrayList[Accumulator]] =
Array.fill(aggregates.length) {
- new JArrayList[Accumulator](2)
- }
- private val intermediateWindowStartPos: Int = intermediateRowArity - 1
+ private var accumulators: Row = _
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+ private var function: GeneratedAggregations = _
override def open(config: Configuration) {
- intermediateRow = new Row(intermediateRowArity)
-
- // init lists with two empty accumulators
- var i = 0
- while (i < aggregates.length) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).add(accumulator)
- accumulatorList(i).add(accumulator)
- i += 1
- }
+ LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+ s"Code:\n$genAggregations.code")
+ val clazz = compile(
+ getClass.getClassLoader,
+ genAggregations.name,
+ genAggregations.code)
+ LOG.debug("Instantiating AggregateHelper.")
+ function = clazz.newInstance()
+
+ accumulators = function.createAccumulators()
+ intermediateRow = function.createOutputRow()
}
override def reduce(records: Iterable[Row], out: Collector[Row]): Unit =
{
// reset first accumulator
- var i = 0
- while (i < aggregates.length) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).set(0, accumulator)
- i += 1
- }
+ function.resetAccumulator(accumulators)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
// accumulate
- i = 0
- while (i < aggregates.length) {
- // insert received accumulator into acc list
- val newAcc = record.getField(groupingKeysLength +
i).asInstanceOf[Accumulator]
- accumulatorList(i).set(1, newAcc)
- // merge acc list
- val retAcc = aggregates(i).merge(accumulatorList(i))
- // insert result into acc list
- accumulatorList(i).set(0, retAcc)
- i += 1
- }
+ function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
// trigger tumbling evaluation
if (!iterator.hasNext) {
--- End diff --
move this behind the loop
> codeGen DataSet Goupingwindow Aggregates
> ----------------------------------------
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)