[
https://issues.apache.org/jira/browse/FLINK-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072699#comment-16072699
]
ASF GitHub Bot commented on FLINK-6649:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4157#discussion_r125299675
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunctionWithUpdateInterval.scala
---
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.{Compiler,
GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Aggregate Function used for the groupby (without window) aggregate
+ * with update interval config.
+ *
+ * @param genAggregations Generated aggregate helper function
+ * @param aggregationStateType The row type info of aggregation
+ * @param outputRowType The row type info of output.
+ */
+class GroupAggProcessFunctionWithUpdateInterval(
+ private val genAggregations: GeneratedAggregationsFunction,
+ private val aggregationStateType: RowTypeInfo,
+ private val outputRowType: RowTypeInfo,
+ private val generateRetraction: Boolean,
+ private val queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+with Compiler[GeneratedAggregations] {
+
+ protected val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+ protected var function: GeneratedAggregations = _
+
+ protected var newRow: CRow = _
+ protected var prevRow: CRow = _
+
+ private var typeSerializer: TypeSerializer[Row] = _
+
+ // stores the accumulators
+ protected var state: ValueState[Row] = _
+
+ // counts the number of added and retracted input records
+ protected var cntState: ValueState[JLong] = _
+
+ // stores the input for group keys
+ private var inputState: ValueState[Row] = _
+
+ // stores the last emit row
+ private var emitState: ValueState[Row] = _
+
+ // stores the emit time
+ private var emitTimerState: ValueState[JLong] = _
+
+
+ override def open(config: Configuration) {
+ LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name}\n\n" +
+ s"Code:\n${genAggregations.code}")
+ val clazz = compile(
+ getRuntimeContext.getUserCodeClassLoader,
+ genAggregations.name,
+ genAggregations.code)
+ LOG.debug("Instantiating AggregateHelper.")
+ function = clazz.newInstance()
+
+ newRow = new CRow(function.createOutputRow, true)
+ prevRow = new CRow(function.createOutputRow, false)
+ typeSerializer = outputRowType.createSerializer(new ExecutionConfig())
+
+ state = getRuntimeContext.getState(
+ new ValueStateDescriptor[Row]("GroupAggregateState",
aggregationStateType))
+ cntState = getRuntimeContext.getState(
+ new ValueStateDescriptor[JLong]("GroupAggregateInputCounter",
Types.LONG))
+ inputState = getRuntimeContext.getState(
+ new ValueStateDescriptor[Row]("GroupAggregatePreEmitState",
classOf[Row]))
+ emitState = getRuntimeContext.getState(
+ new ValueStateDescriptor[Row]("GroupAggregateEmitState",
outputRowType))
+ emitTimerState = getRuntimeContext.getState(
+ new ValueStateDescriptor[JLong]("emitTimeState", Types.LONG))
+
+ initCleanupTimeState("GroupAggregateWithUpdateIntervalCleanupTime")
+ }
+
+ override def processElement(
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ val currentTime = ctx.timerService().currentProcessingTime()
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, currentTime)
+
+ val input = inputC.row
+
+ // get accumulators and input counter
+ var accumulators = state.value
+ var inputCnt = cntState.value
+
+ if (null == accumulators) {
+ accumulators = function.createAccumulators()
+ inputState.update(input)
+ inputCnt = 0L
+ }
+
+ // update aggregate result and set to the newRow
+ if (inputC.change) {
+ inputCnt += 1
+ // accumulate input
+ function.accumulate(accumulators, input)
+ } else {
+ inputCnt -= 1
+ // retract input
+ function.retract(accumulators, input)
+ }
+
+ state.update(accumulators)
+ cntState.update(inputCnt)
+
+ var triggerTimer = emitTimerState.value
+
+ if (null == triggerTimer) {
+ triggerTimer = 0L
+ }
+
+ if (currentTime >= triggerTimer) {
+
+ val newTimer = currentTime +
queryConfig.getUnboundedAggregateUpdateInterval
+
+ emitTimerState.update(newTimer)
+
+ ctx.timerService().registerProcessingTimeTimer(newTimer)
--- End diff --
Should we define update rate on processing time or event time?
> Improve Non-window group aggregate with configurable `earlyFire`.
> -----------------------------------------------------------------
>
> Key: FLINK-6649
> URL: https://issues.apache.org/jira/browse/FLINK-6649
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Currently, Non-windowed group aggregate is earlyFiring at count(1), that is
> every row will emit a aggregate result. But some times user want config count
> number (`early firing with count[N]`) , to reduce the downstream pressure.
> This JIRA. will enable the config of e`earlyFiring` for Non-windowed group
> aggregate.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)