[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897145#comment-15897145 ]
ASF GitHub Bot commented on FLINK-5956: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104394062 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +/** The initial accumulator for Sum with retract aggregate function */ +class SumWithRetractAccumulator[T] extends JTuple2[T, Long] with Accumulator + +/** + * Base class for built-in Sum with retract aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class SumWithRetractAggFunction[T: Numeric] extends AggregateFunction[T] { + + private val numeric = implicitly[Numeric[T]] + + override def createAccumulator(): Accumulator = { + val acc = new SumWithRetractAccumulator[T]() + acc.f0 = numeric.zero //sum + acc.f1 = 0L //total count + acc + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + a.f0 = numeric.plus(a.f0, v) + a.f1 += 1 + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + a.f0 = numeric.minus(a.f0, v) + a.f1 -= 1 + } + } + + override def getValue(accumulator: Accumulator): T = { + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + if (a.f1 > 0) { + a.f0 + } else { + null.asInstanceOf[T] + } + } + + override def merge(accumulators: JList[Accumulator]): Accumulator = { + val ret = createAccumulator().asInstanceOf[SumWithRetractAccumulator[T]] + var i: Int = 0 + while (i < accumulators.size()) { + val a = accumulators.get(i).asInstanceOf[SumWithRetractAccumulator[T]] + ret.f0 = numeric.plus(ret.f0, a.f0) + ret.f1 += a.f1 + i += 1 + } + ret + } + + override def getAccumulatorType(): TypeInformation[_] = { + new TupleTypeInfo( + (new SumWithRetractAccumulator).getClass, + getValueTypeInfo, + BasicTypeInfo.LONG_TYPE_INFO) + } + + def getValueTypeInfo: TypeInformation[_] +} + +/** + * Built-in Byte Sum with retract aggregate function + */ +class ByteSumWithRetractAggFunction extends SumWithRetractAggFunction[Byte] { + override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO +} + +/** + * Built-in Short Sum with retract aggregate function + */ +class ShortSumWithRetractAggFunction extends SumWithRetractAggFunction[Short] { + override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO +} + +/** + * Built-in Int Sum with retract aggregate function + */ +class IntSumWithRetractAggFunction extends SumWithRetractAggFunction[Int] { + override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO +} + +/** + * Built-in Long Sum with retract aggregate function + */ +class LongSumWithRetractAggFunction extends SumWithRetractAggFunction[Long] { + override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO +} + +/** + * Built-in Float Sum with retract aggregate function + */ +class FloatSumWithRetractAggFunction extends SumWithRetractAggFunction[Float] { + override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO +} + +/** + * Built-in Double Sum with retract aggregate function + */ +class DoubleSumWithRetractAggFunction extends SumWithRetractAggFunction[Double] { + override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO +} + +/** The initial accumulator for Big Decimal Sum with retract aggregate function */ +class DecimalSumWithRetractAccumulator extends JTuple2[BigDecimal, Long] with Accumulator { + f0 = BigDecimal.ZERO + f1 = 0L +} + +/** + * Built-in Big Decimal Sum with retract aggregate function + */ +class DecimalSumWithRetractAggFunction extends AggregateFunction[BigDecimal] { + + override def createAccumulator(): Accumulator = { + new DecimalSumWithRetractAccumulator + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf[BigDecimal] + val accum = accumulator.asInstanceOf[DecimalSumWithRetractAccumulator] + accum.f0 = accum.f0.add(v) + accum.f1 += 1L + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf[BigDecimal] + val accum = accumulator.asInstanceOf[DecimalSumWithRetractAccumulator] + accum.f0 = accum.f0.subtract(v) + accum.f1 -= 1L + if (accum.f1 == 0) { --- End diff -- Do we need this check? > Add retract method into the aggregateFunction > --------------------------------------------- > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)