[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878158#comment-15878158
 ] 

ASF GitHub Bot commented on FLINK-5767:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3354#discussion_r102241426
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/builtInAggFuncs/AvgAggFunction.scala
 ---
    @@ -0,0 +1,268 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions.builtInAggFuncs
    +
    +import java.math.{BigDecimal, BigInteger}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +/**
    +  * Base class for built-in Integral Avg aggregate function
    +  *
    +  * @tparam T the type for the aggregation result
    +  */
    +abstract class IntegralAvgAggFunction[T] extends AggregateFunction[T] {
    +  /** The initial accumulator for Integral Avg aggregate function */
    +  class IntegralAvgAccumulator extends Accumulator {
    +    var sum: Long = 0
    +    var count: Long = 0
    +  }
    +
    +  override def createAccumulator(): Accumulator = {
    +    new IntegralAvgAccumulator
    +  }
    +
    +  override def accumulate(accumulator: Accumulator, value: Any) = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[Number].longValue()
    +      val accum = accumulator.asInstanceOf[IntegralAvgAccumulator]
    +      accum.sum += v
    +      accum.count += 1
    +    }
    +  }
    +
    +  override def getValue(accumulator: Accumulator): T = {
    +    val accum = accumulator.asInstanceOf[IntegralAvgAccumulator]
    +    val sum = accum.sum
    +    if (accum.count == 0) {
    +      null.asInstanceOf[T]
    +    } else {
    +      resultTypeConvert(accum.sum / accum.count)
    +    }
    +  }
    +
    +  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
    +    val aAccum = a.asInstanceOf[IntegralAvgAccumulator]
    +    val bAccum = b.asInstanceOf[IntegralAvgAccumulator]
    +    aAccum.count += bAccum.count
    +    aAccum.sum += bAccum.sum
    +    a
    +  }
    +  /**
    +    * Convert the intermediate result to the expected aggregation result 
type
    +    *
    +    * @param value the intermediate result. We use a Long container to save
    +    *         the intermediate result to avoid the overflow by sum 
operation.
    +    * @return the result value with the expected aggregation result type
    +    */
    +  def resultTypeConvert(value: Long): T
    +}
    +
    +/**
    +  * Built-in Byte Avg aggregate function
    +  */
    +class ByteAvgAggFunction extends IntegralAvgAggFunction[Byte] {
    +  override def resultTypeConvert(value: Long): Byte = value.toByte
    +}
    +
    +/**
    +  * Built-in Short Avg aggregate function
    +  */
    +class ShortAvgAggFunction extends IntegralAvgAggFunction[Short] {
    +  override def resultTypeConvert(value: Long): Short = value.toShort
    +}
    +
    +/**
    +  * Built-in Int Avg aggregate function
    +  */
    +class IntAvgAggFunction extends IntegralAvgAggFunction[Int] {
    +  override def resultTypeConvert(value: Long): Int = value.toInt
    +}
    +
    +/**
    +  * Base Class for Built-in Big Integral Avg aggregate function
    +  *
    +  * @tparam T the type for the aggregation result
    +  */
    +abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] {
    +  /** The initial accumulator for Big Integral Avg aggregate function */
    +  class BigIntegralAvgAccumulator extends Accumulator {
    +    var sum: BigInteger = BigInteger.ZERO
    +    var count: Long = 0
    +  }
    +
    +  override def createAccumulator(): Accumulator = {
    +    new BigIntegralAvgAccumulator
    +  }
    +
    +  override def accumulate(accumulator: Accumulator, value: Any) = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[Long]
    +      val accum = accumulator.asInstanceOf[BigIntegralAvgAccumulator]
    +      accum.sum = accum.sum.add(BigInteger.valueOf(v))
    +      accum.count += 1
    +    }
    +  }
    +
    +  override def getValue(accumulator: Accumulator): T = {
    +    val accum = accumulator.asInstanceOf[BigIntegralAvgAccumulator]
    +    val sum = accum.sum
    +    if (accum.count == 0) {
    +      null.asInstanceOf[T]
    +    } else {
    +      resultTypeConvert(accum.sum.divide(BigInteger.valueOf(accum.count)))
    +    }
    +  }
    +
    +  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
    +    val aAccum = a.asInstanceOf[BigIntegralAvgAccumulator]
    +    val bAccum = b.asInstanceOf[BigIntegralAvgAccumulator]
    +    aAccum.count += bAccum.count
    +    aAccum.sum = aAccum.sum.add(bAccum.sum)
    +    a
    +  }
    +
    +  /**
    +    * Convert the intermediate result to the expected aggregation result 
type
    +    *
    +    * @param value the intermediate result. We use a BigInteger container 
to
    +    *         save the intermediate result to avoid the overflow by sum
    +    *         operation.
    +    * @return the result value with the expected aggregation result type
    +    */
    +  def resultTypeConvert(value: BigInteger): T
    +}
    +
    +/**
    +  * Built-in Long Avg aggregate function
    +  */
    +class LongAvgAggFunction extends BigIntegralAvgAggFunction[Long] {
    +  override def resultTypeConvert(value: BigInteger): Long = 
value.longValue()
    +}
    +
    +/**
    +  * Base class for built-in Floating Avg aggregate function
    +  *
    +  * @tparam T the type for the aggregation result
    +  */
    +abstract class FloatingAvgAggFunction[T] extends AggregateFunction[T] {
    +  /** The initial accumulator for Floating Avg aggregate function */
    +  class FloatingAvgAccumulator extends Accumulator {
    +    var sum: Double = 0
    +    var count: Long = 0
    +  }
    +
    +  override def createAccumulator(): Accumulator = {
    +    new FloatingAvgAccumulator
    +  }
    +
    +  override def accumulate(accumulator: Accumulator, value: Any) = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[Number].doubleValue()
    +      val accum = accumulator.asInstanceOf[FloatingAvgAccumulator]
    +      accum.sum += v
    +      accum.count += 1
    +    }
    +  }
    +
    +  override def getValue(accumulator: Accumulator): T = {
    +    val accum = accumulator.asInstanceOf[FloatingAvgAccumulator]
    +    val sum = accum.sum
    +    if (accum.count == 0) {
    +      null.asInstanceOf[T]
    +    } else {
    +      resultTypeConvert(accum.sum / accum.count)
    +    }
    +  }
    +
    +  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
    +    val aAccum = a.asInstanceOf[FloatingAvgAccumulator]
    +    val bAccum = b.asInstanceOf[FloatingAvgAccumulator]
    +    aAccum.count += bAccum.count
    +    aAccum.sum += bAccum.sum
    +    a
    +  }
    +
    +  /**
    +    * Convert the intermediate result to the expected aggregation result 
type
    +    *
    +    * @param value the intermediate result. We use a Double container to 
save
    +    *         the intermediate result to avoid the overflow by sum 
operation.
    +    * @return the result value with the expected aggregation result type
    +    */
    +  def resultTypeConvert(value: Double): T
    +}
    +
    +/**
    +  * Built-in Float Avg aggregate function
    +  */
    +class FloatAvgAggFunction extends FloatingAvgAggFunction[Float] {
    +  override def resultTypeConvert(value: Double): Float = value.toFloat
    +}
    +
    +/**
    +  * Built-in Int Double aggregate function
    +  */
    +class DoubleAvgAggFunction extends FloatingAvgAggFunction[Double] {
    +  override def resultTypeConvert(value: Double): Double = value
    +}
    +
    +/**
    +  * Base class for built-in Big Decimal Avg aggregate function
    +  */
    +class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] {
    +  /** The initial accumulator for Big Decimal Avg aggregate function */
    +  class DecimalAvgAccumulator extends Accumulator {
    +    var sum: BigDecimal = null
    +    var count: Long = 0
    +  }
    +
    +  override def createAccumulator(): Accumulator = {
    +    new DecimalAvgAccumulator
    +  }
    +
    +  override def accumulate(accumulator: Accumulator, value: Any) = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[BigDecimal]
    +      val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
    +      accum.count += 1
    +      if (accum.sum == null) {
    +        accum.sum = v
    +      } else {
    +        accum.sum = accum.sum.add(v)
    +      }
    +    }
    +  }
    +
    +  override def getValue(accumulator: Accumulator): BigDecimal = {
    +    val sum = accumulator.asInstanceOf[DecimalAvgAccumulator].sum
    +    val count = accumulator.asInstanceOf[DecimalAvgAccumulator].count
    +    if (sum == null || count == 0) {
    +      null.asInstanceOf[BigDecimal]
    +    } else {
    +      sum.divide(BigDecimal.valueOf(count))
    +    }
    +  }
    +
    +  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
    +    val aAccum = a.asInstanceOf[DecimalAvgAccumulator]
    +    val bAccum = b.asInstanceOf[DecimalAvgAccumulator]
    +    aAccum.count += bAccum.count
    +    accumulate(a, b.asInstanceOf[DecimalAvgAccumulator].sum)
    --- End diff --
    
    Won't `count` be off by one because `accumulate` will increment `count` as 
well?


> New aggregate function interface and built-in aggregate functions
> -----------------------------------------------------------------
>
>                 Key: FLINK-5767
>                 URL: https://issues.apache.org/jira/browse/FLINK-5767
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to