[ 
https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897145#comment-15897145
 ] 

ASF GitHub Bot commented on FLINK-5956:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3470#discussion_r104394062
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala
 ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.math.BigDecimal
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +/** The initial accumulator for Sum with retract aggregate function */
    +class SumWithRetractAccumulator[T] extends JTuple2[T, Long] with 
Accumulator
    +
    +/**
    +  * Base class for built-in Sum with retract aggregate function
    +  *
    +  * @tparam T the type for the aggregation result
    +  */
    +abstract class SumWithRetractAggFunction[T: Numeric] extends 
AggregateFunction[T] {
    +
    +  private val numeric = implicitly[Numeric[T]]
    +
    +  override def createAccumulator(): Accumulator = {
    +    val acc = new SumWithRetractAccumulator[T]()
    +    acc.f0 = numeric.zero //sum
    +    acc.f1 = 0L //total count
    +    acc
    +  }
    +
    +  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[T]
    +      val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
    +      a.f0 = numeric.plus(a.f0, v)
    +      a.f1 += 1
    +    }
    +  }
    +
    +  override def retract(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[T]
    +      val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
    +      a.f0 = numeric.minus(a.f0, v)
    +      a.f1 -= 1
    +    }
    +  }
    +
    +  override def getValue(accumulator: Accumulator): T = {
    +    val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
    +    if (a.f1 > 0) {
    +      a.f0
    +    } else {
    +      null.asInstanceOf[T]
    +    }
    +  }
    +
    +  override def merge(accumulators: JList[Accumulator]): Accumulator = {
    +    val ret = 
createAccumulator().asInstanceOf[SumWithRetractAccumulator[T]]
    +    var i: Int = 0
    +    while (i < accumulators.size()) {
    +      val a = 
accumulators.get(i).asInstanceOf[SumWithRetractAccumulator[T]]
    +      ret.f0 = numeric.plus(ret.f0, a.f0)
    +      ret.f1 += a.f1
    +      i += 1
    +    }
    +    ret
    +  }
    +
    +  override def getAccumulatorType(): TypeInformation[_] = {
    +    new TupleTypeInfo(
    +      (new SumWithRetractAccumulator).getClass,
    +      getValueTypeInfo,
    +      BasicTypeInfo.LONG_TYPE_INFO)
    +  }
    +
    +  def getValueTypeInfo: TypeInformation[_]
    +}
    +
    +/**
    +  * Built-in Byte Sum with retract aggregate function
    +  */
    +class ByteSumWithRetractAggFunction extends 
SumWithRetractAggFunction[Byte] {
    +  override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
    +}
    +
    +/**
    +  * Built-in Short Sum with retract aggregate function
    +  */
    +class ShortSumWithRetractAggFunction extends 
SumWithRetractAggFunction[Short] {
    +  override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
    +}
    +
    +/**
    +  * Built-in Int Sum with retract aggregate function
    +  */
    +class IntSumWithRetractAggFunction extends SumWithRetractAggFunction[Int] {
    +  override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
    +}
    +
    +/**
    +  * Built-in Long Sum with retract aggregate function
    +  */
    +class LongSumWithRetractAggFunction extends 
SumWithRetractAggFunction[Long] {
    +  override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
    +}
    +
    +/**
    +  * Built-in Float Sum with retract aggregate function
    +  */
    +class FloatSumWithRetractAggFunction extends 
SumWithRetractAggFunction[Float] {
    +  override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
    +}
    +
    +/**
    +  * Built-in Double Sum with retract aggregate function
    +  */
    +class DoubleSumWithRetractAggFunction extends 
SumWithRetractAggFunction[Double] {
    +  override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
    +}
    +
    +/** The initial accumulator for Big Decimal Sum with retract aggregate 
function */
    +class DecimalSumWithRetractAccumulator extends JTuple2[BigDecimal, Long] 
with Accumulator {
    +  f0 = BigDecimal.ZERO
    +  f1 = 0L
    +}
    +
    +/**
    +  * Built-in Big Decimal Sum with retract aggregate function
    +  */
    +class DecimalSumWithRetractAggFunction extends 
AggregateFunction[BigDecimal] {
    +
    +  override def createAccumulator(): Accumulator = {
    +    new DecimalSumWithRetractAccumulator
    +  }
    +
    +  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[BigDecimal]
    +      val accum = 
accumulator.asInstanceOf[DecimalSumWithRetractAccumulator]
    +      accum.f0 = accum.f0.add(v)
    +      accum.f1 += 1L
    +    }
    +  }
    +
    +  override def retract(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[BigDecimal]
    +      val accum = 
accumulator.asInstanceOf[DecimalSumWithRetractAccumulator]
    +      accum.f0 = accum.f0.subtract(v)
    +      accum.f1 -= 1L
    +      if (accum.f1 == 0) {
    --- End diff --
    
    Do we need this check?


> Add retract method into the aggregateFunction
> ---------------------------------------------
>
>                 Key: FLINK-5956
>                 URL: https://issues.apache.org/jira/browse/FLINK-5956
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Retraction method is help for processing updated message. It will also very 
> helpful for window Aggregation. This PR will first add retraction methods 
> into the aggregateFunctions, such that on-going over window Aggregation can 
> get benefit from it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to