[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878158#comment-15878158 ]
ASF GitHub Bot commented on FLINK-5767: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102241426 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/builtInAggFuncs/AvgAggFunction.scala --- @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.builtInAggFuncs + +import java.math.{BigDecimal, BigInteger} +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +/** + * Base class for built-in Integral Avg aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class IntegralAvgAggFunction[T] extends AggregateFunction[T] { + /** The initial accumulator for Integral Avg aggregate function */ + class IntegralAvgAccumulator extends Accumulator { + var sum: Long = 0 + var count: Long = 0 + } + + override def createAccumulator(): Accumulator = { + new IntegralAvgAccumulator + } + + override def accumulate(accumulator: Accumulator, value: Any) = { + if (value != null) { + val v = value.asInstanceOf[Number].longValue() + val accum = accumulator.asInstanceOf[IntegralAvgAccumulator] + accum.sum += v + accum.count += 1 + } + } + + override def getValue(accumulator: Accumulator): T = { + val accum = accumulator.asInstanceOf[IntegralAvgAccumulator] + val sum = accum.sum + if (accum.count == 0) { + null.asInstanceOf[T] + } else { + resultTypeConvert(accum.sum / accum.count) + } + } + + override def merge(a: Accumulator, b: Accumulator): Accumulator = { + val aAccum = a.asInstanceOf[IntegralAvgAccumulator] + val bAccum = b.asInstanceOf[IntegralAvgAccumulator] + aAccum.count += bAccum.count + aAccum.sum += bAccum.sum + a + } + /** + * Convert the intermediate result to the expected aggregation result type + * + * @param value the intermediate result. We use a Long container to save + * the intermediate result to avoid the overflow by sum operation. + * @return the result value with the expected aggregation result type + */ + def resultTypeConvert(value: Long): T +} + +/** + * Built-in Byte Avg aggregate function + */ +class ByteAvgAggFunction extends IntegralAvgAggFunction[Byte] { + override def resultTypeConvert(value: Long): Byte = value.toByte +} + +/** + * Built-in Short Avg aggregate function + */ +class ShortAvgAggFunction extends IntegralAvgAggFunction[Short] { + override def resultTypeConvert(value: Long): Short = value.toShort +} + +/** + * Built-in Int Avg aggregate function + */ +class IntAvgAggFunction extends IntegralAvgAggFunction[Int] { + override def resultTypeConvert(value: Long): Int = value.toInt +} + +/** + * Base Class for Built-in Big Integral Avg aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] { + /** The initial accumulator for Big Integral Avg aggregate function */ + class BigIntegralAvgAccumulator extends Accumulator { + var sum: BigInteger = BigInteger.ZERO + var count: Long = 0 + } + + override def createAccumulator(): Accumulator = { + new BigIntegralAvgAccumulator + } + + override def accumulate(accumulator: Accumulator, value: Any) = { + if (value != null) { + val v = value.asInstanceOf[Long] + val accum = accumulator.asInstanceOf[BigIntegralAvgAccumulator] + accum.sum = accum.sum.add(BigInteger.valueOf(v)) + accum.count += 1 + } + } + + override def getValue(accumulator: Accumulator): T = { + val accum = accumulator.asInstanceOf[BigIntegralAvgAccumulator] + val sum = accum.sum + if (accum.count == 0) { + null.asInstanceOf[T] + } else { + resultTypeConvert(accum.sum.divide(BigInteger.valueOf(accum.count))) + } + } + + override def merge(a: Accumulator, b: Accumulator): Accumulator = { + val aAccum = a.asInstanceOf[BigIntegralAvgAccumulator] + val bAccum = b.asInstanceOf[BigIntegralAvgAccumulator] + aAccum.count += bAccum.count + aAccum.sum = aAccum.sum.add(bAccum.sum) + a + } + + /** + * Convert the intermediate result to the expected aggregation result type + * + * @param value the intermediate result. We use a BigInteger container to + * save the intermediate result to avoid the overflow by sum + * operation. + * @return the result value with the expected aggregation result type + */ + def resultTypeConvert(value: BigInteger): T +} + +/** + * Built-in Long Avg aggregate function + */ +class LongAvgAggFunction extends BigIntegralAvgAggFunction[Long] { + override def resultTypeConvert(value: BigInteger): Long = value.longValue() +} + +/** + * Base class for built-in Floating Avg aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class FloatingAvgAggFunction[T] extends AggregateFunction[T] { + /** The initial accumulator for Floating Avg aggregate function */ + class FloatingAvgAccumulator extends Accumulator { + var sum: Double = 0 + var count: Long = 0 + } + + override def createAccumulator(): Accumulator = { + new FloatingAvgAccumulator + } + + override def accumulate(accumulator: Accumulator, value: Any) = { + if (value != null) { + val v = value.asInstanceOf[Number].doubleValue() + val accum = accumulator.asInstanceOf[FloatingAvgAccumulator] + accum.sum += v + accum.count += 1 + } + } + + override def getValue(accumulator: Accumulator): T = { + val accum = accumulator.asInstanceOf[FloatingAvgAccumulator] + val sum = accum.sum + if (accum.count == 0) { + null.asInstanceOf[T] + } else { + resultTypeConvert(accum.sum / accum.count) + } + } + + override def merge(a: Accumulator, b: Accumulator): Accumulator = { + val aAccum = a.asInstanceOf[FloatingAvgAccumulator] + val bAccum = b.asInstanceOf[FloatingAvgAccumulator] + aAccum.count += bAccum.count + aAccum.sum += bAccum.sum + a + } + + /** + * Convert the intermediate result to the expected aggregation result type + * + * @param value the intermediate result. We use a Double container to save + * the intermediate result to avoid the overflow by sum operation. + * @return the result value with the expected aggregation result type + */ + def resultTypeConvert(value: Double): T +} + +/** + * Built-in Float Avg aggregate function + */ +class FloatAvgAggFunction extends FloatingAvgAggFunction[Float] { + override def resultTypeConvert(value: Double): Float = value.toFloat +} + +/** + * Built-in Int Double aggregate function + */ +class DoubleAvgAggFunction extends FloatingAvgAggFunction[Double] { + override def resultTypeConvert(value: Double): Double = value +} + +/** + * Base class for built-in Big Decimal Avg aggregate function + */ +class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] { + /** The initial accumulator for Big Decimal Avg aggregate function */ + class DecimalAvgAccumulator extends Accumulator { + var sum: BigDecimal = null + var count: Long = 0 + } + + override def createAccumulator(): Accumulator = { + new DecimalAvgAccumulator + } + + override def accumulate(accumulator: Accumulator, value: Any) = { + if (value != null) { + val v = value.asInstanceOf[BigDecimal] + val accum = accumulator.asInstanceOf[DecimalAvgAccumulator] + accum.count += 1 + if (accum.sum == null) { + accum.sum = v + } else { + accum.sum = accum.sum.add(v) + } + } + } + + override def getValue(accumulator: Accumulator): BigDecimal = { + val sum = accumulator.asInstanceOf[DecimalAvgAccumulator].sum + val count = accumulator.asInstanceOf[DecimalAvgAccumulator].count + if (sum == null || count == 0) { + null.asInstanceOf[BigDecimal] + } else { + sum.divide(BigDecimal.valueOf(count)) + } + } + + override def merge(a: Accumulator, b: Accumulator): Accumulator = { + val aAccum = a.asInstanceOf[DecimalAvgAccumulator] + val bAccum = b.asInstanceOf[DecimalAvgAccumulator] + aAccum.count += bAccum.count + accumulate(a, b.asInstanceOf[DecimalAvgAccumulator].sum) --- End diff -- Won't `count` be off by one because `accumulate` will increment `count` as well? > New aggregate function interface and built-in aggregate functions > ----------------------------------------------------------------- > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)