Repository: flink Updated Branches: refs/heads/master 050f9a416 -> c31f95cab
[FLINK-5927] [table] Remove old Aggregate interface, built-in functions, and tests. This closes #3465. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c31f95ca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c31f95ca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c31f95ca Branch: refs/heads/master Commit: c31f95cab884452dba47306c2b7fb536f047b8ae Parents: 14fab4c Author: shaoxuan-wang <[email protected]> Authored: Fri Mar 3 15:05:00 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Fri Mar 3 14:27:08 2017 +0100 ---------------------------------------------------------------------- .../table/runtime/aggregate/Aggregate.scala | 96 ------ .../table/runtime/aggregate/AvgAggregate.scala | 296 ------------------- .../runtime/aggregate/CountAggregate.scala | 55 ---- .../table/runtime/aggregate/MaxAggregate.scala | 171 ----------- .../table/runtime/aggregate/MinAggregate.scala | 171 ----------- .../table/runtime/aggregate/SumAggregate.scala | 131 -------- .../runtime/aggregate/AggregateTestBase.scala | 111 ------- .../runtime/aggregate/AvgAggregateTest.scala | 154 ---------- .../runtime/aggregate/CountAggregateTest.scala | 31 -- .../runtime/aggregate/MaxAggregateTest.scala | 177 ----------- .../runtime/aggregate/MinAggregateTest.scala | 177 ----------- .../runtime/aggregate/SumAggregateTest.scala | 137 --------- 12 files changed, 1707 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala deleted file mode 100644 index a614783..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.types.Row - -/** - * The interface for all Flink aggregate functions, which expressed in terms of initiate(), - * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases: - * -- In Map phase, use prepare() to transform aggregate field value into intermediate - * aggregate value. - * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values - * into aggregate buffer. Then use evaluate() to calculate the final aggregated value. - * For associative decomposable aggregate functions, they support partial aggregate. To optimize - * the performance, a Combine phase would be added between Map phase and GroupReduce phase, - * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values - * into aggregate buffer. - * - * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start - * field index in Row, so different aggregate functions could share the same Row as intermediate - * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields - * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM - * primitives, and Flink use intermediateDataType() to get its data types in SQL side. - * - * @tparam T Aggregated value type. - */ -trait Aggregate[T] extends Serializable { - - /** - * Transform the aggregate field value into intermediate aggregate data. - * - * @param value The value to insert into the intermediate aggregate row. - * @param intermediate The intermediate aggregate row into which the value is inserted. - */ - def prepare(value: Any, intermediate: Row): Unit - - /** - * Initiate the intermediate aggregate value in Row. - * - * @param intermediate The intermediate aggregate row to initiate. - */ - def initiate(intermediate: Row): Unit - - /** - * Merge intermediate aggregate data into aggregate buffer. - * - * @param intermediate The intermediate aggregate row to merge. - * @param buffer The aggregate buffer into which the intermedidate is merged. - */ - def merge(intermediate: Row, buffer: Row): Unit - - /** - * Calculate the final aggregated result based on aggregate buffer. - * - * @param buffer The aggregate buffer from which the final aggregate is computed. - * @return The final result of the aggregate. - */ - def evaluate(buffer: Row): T - - /** - * Intermediate aggregate value types. - * - * @return The types of the intermediate fields of this aggregate. - */ - def intermediateDataType: Array[TypeInformation[_]] - - /** - * Set the aggregate data offset in Row. - * - * @param aggOffset The offset of this aggregate in the intermediate aggregate rows. - */ - def setAggOffsetInRow(aggOffset: Int) - - /** - * Whether aggregate function support partial aggregate. - * - * @return True if the aggregate supports partial aggregation, False otherwise. - */ - def supportPartial: Boolean = false -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala deleted file mode 100644 index cb94ca1..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import com.google.common.math.LongMath -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.types.Row -import java.math.BigDecimal -import java.math.BigInteger - -abstract class AvgAggregate[T] extends Aggregate[T] { - protected var partialSumIndex: Int = _ - protected var partialCountIndex: Int = _ - - override def supportPartial: Boolean = true - - override def setAggOffsetInRow(aggOffset: Int): Unit = { - partialSumIndex = aggOffset - partialCountIndex = aggOffset + 1 - } -} - -abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] { - - override def initiate(partial: Row): Unit = { - partial.setField(partialSumIndex, 0L) - partial.setField(partialCountIndex, 0L) - } - - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - partial.setField(partialSumIndex, 0L) - partial.setField(partialCountIndex, 0L) - } else { - doPrepare(value, partial) - } - } - - override def merge(partial: Row, buffer: Row): Unit = { - val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long] - val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long] - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, bufferSum)) - buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) - } - - override def evaluate(buffer : Row): T = { - doEvaluate(buffer).asInstanceOf[T] - } - - override def intermediateDataType = Array( - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO) - - def doPrepare(value: Any, partial: Row): Unit - - def doEvaluate(buffer: Row): Any -} - -class ByteAvgAggregate extends IntegralAvgAggregate[Byte] { - override def doPrepare(value: Any, partial: Row): Unit = { - val input = value.asInstanceOf[Byte] - partial.setField(partialSumIndex, input.toLong) - partial.setField(partialCountIndex, 1L) - } - - override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - if (bufferCount == 0L) { - null - } else { - (bufferSum / bufferCount).toByte - } - } -} - -class ShortAvgAggregate extends IntegralAvgAggregate[Short] { - - override def doPrepare(value: Any, partial: Row): Unit = { - val input = value.asInstanceOf[Short] - partial.setField(partialSumIndex, input.toLong) - partial.setField(partialCountIndex, 1L) - } - - override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - if (bufferCount == 0L) { - null - } else { - (bufferSum / bufferCount).toShort - } - } -} - -class IntAvgAggregate extends IntegralAvgAggregate[Int] { - - override def doPrepare(value: Any, partial: Row): Unit = { - val input = value.asInstanceOf[Int] - partial.setField(partialSumIndex, input.toLong) - partial.setField(partialCountIndex, 1L) - } - - override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - if (bufferCount == 0L) { - null - } else { - (bufferSum / bufferCount).toInt - } - } -} - -class LongAvgAggregate extends IntegralAvgAggregate[Long] { - - override def intermediateDataType = Array( - BasicTypeInfo.BIG_INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO) - - override def initiate(partial: Row): Unit = { - partial.setField(partialSumIndex, BigInteger.ZERO) - partial.setField(partialCountIndex, 0L) - } - - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - partial.setField(partialSumIndex, BigInteger.ZERO) - partial.setField(partialCountIndex, 0L) - } else { - doPrepare(value, partial) - } - } - - override def doPrepare(value: Any, partial: Row): Unit = { - val input = value.asInstanceOf[Long] - partial.setField(partialSumIndex, BigInteger.valueOf(input)) - partial.setField(partialCountIndex, 1L) - } - - override def merge(partial: Row, buffer: Row): Unit = { - val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger] - val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long] - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - buffer.setField(partialSumIndex, partialSum.add(bufferSum)) - buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) - } - - override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - if (bufferCount == 0L) { - null - } else { - bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue() - } - } -} - -abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] { - - override def initiate(partial: Row): Unit = { - partial.setField(partialSumIndex, 0D) - partial.setField(partialCountIndex, 0L) - } - - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - partial.setField(partialSumIndex, 0D) - partial.setField(partialCountIndex, 0L) - } else { - doPrepare(value, partial) - } - } - - override def merge(partial: Row, buffer: Row): Unit = { - val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double] - val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long] - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - - buffer.setField(partialSumIndex, partialSum + bufferSum) - buffer.setField(partialCountIndex, partialCount + bufferCount) - } - - override def evaluate(buffer : Row): T = { - doEvaluate(buffer).asInstanceOf[T] - } - - override def intermediateDataType = Array( - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO) - - def doPrepare(value: Any, partial: Row): Unit - - def doEvaluate(buffer: Row): Any -} - -class FloatAvgAggregate extends FloatingAvgAggregate[Float] { - - override def doPrepare(value: Any, partial: Row): Unit = { - val input = value.asInstanceOf[Float] - partial.setField(partialSumIndex, input.toDouble) - partial.setField(partialCountIndex, 1L) - } - - - override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - if (bufferCount == 0L) { - null - } else { - (bufferSum / bufferCount).toFloat - } - } -} - -class DoubleAvgAggregate extends FloatingAvgAggregate[Double] { - - override def doPrepare(value: Any, partial: Row): Unit = { - val input = value.asInstanceOf[Double] - partial.setField(partialSumIndex, input) - partial.setField(partialCountIndex, 1L) - } - - override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - if (bufferCount == 0L) { - null - } else { - (bufferSum / bufferCount) - } - } -} - -class DecimalAvgAggregate extends AvgAggregate[BigDecimal] { - - override def intermediateDataType = Array( - BasicTypeInfo.BIG_DEC_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO) - - override def initiate(partial: Row): Unit = { - partial.setField(partialSumIndex, BigDecimal.ZERO) - partial.setField(partialCountIndex, 0L) - } - - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - initiate(partial) - } else { - val input = value.asInstanceOf[BigDecimal] - partial.setField(partialSumIndex, input) - partial.setField(partialCountIndex, 1L) - } - } - - override def merge(partial: Row, buffer: Row): Unit = { - val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal] - val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long] - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal] - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - buffer.setField(partialSumIndex, partialSum.add(bufferSum)) - buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) - } - - override def evaluate(buffer: Row): BigDecimal = { - val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] - if (bufferCount != 0) { - val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal] - bufferSum.divide(BigDecimal.valueOf(bufferCount)) - } else { - null.asInstanceOf[BigDecimal] - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala deleted file mode 100644 index ea8e1d8..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.types.Row - -class CountAggregate extends Aggregate[Long] { - private var countIndex: Int = _ - - override def initiate(intermediate: Row): Unit = { - intermediate.setField(countIndex, 0L) - } - - override def merge(intermediate: Row, buffer: Row): Unit = { - val partialCount = intermediate.getField(countIndex).asInstanceOf[Long] - val bufferCount = buffer.getField(countIndex).asInstanceOf[Long] - buffer.setField(countIndex, partialCount + bufferCount) - } - - override def evaluate(buffer: Row): Long = { - buffer.getField(countIndex).asInstanceOf[Long] - } - - override def prepare(value: Any, intermediate: Row): Unit = { - if (value == null) { - intermediate.setField(countIndex, 0L) - } else { - intermediate.setField(countIndex, 1L) - } - } - - override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO) - - override def supportPartial: Boolean = true - - override def setAggOffsetInRow(aggIndex: Int): Unit = { - countIndex = aggIndex - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala deleted file mode 100644 index 34b25e0..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.math.BigDecimal - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.types.Row - -abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { - - protected var maxIndex = -1 - - /** - * Initiate the intermediate aggregate value in Row. - * - * @param intermediate The intermediate aggregate row to initiate. - */ - override def initiate(intermediate: Row): Unit = { - intermediate.setField(maxIndex, null) - } - - /** - * Accessed in MapFunction, prepare the input of partial aggregate. - * - * @param value - * @param intermediate - */ - override def prepare(value: Any, intermediate: Row): Unit = { - if (value == null) { - initiate(intermediate) - } else { - intermediate.setField(maxIndex, value) - } - } - - /** - * Accessed in CombineFunction and GroupReduceFunction, merge partial - * aggregate result into aggregate buffer. - * - * @param intermediate - * @param buffer - */ - override def merge(intermediate: Row, buffer: Row): Unit = { - val partialValue = intermediate.getField(maxIndex).asInstanceOf[T] - if (partialValue != null) { - val bufferValue = buffer.getField(maxIndex).asInstanceOf[T] - if (bufferValue != null) { - val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue - buffer.setField(maxIndex, max) - } else { - buffer.setField(maxIndex, partialValue) - } - } - } - - /** - * Return the final aggregated result based on aggregate buffer. - * - * @param buffer - * @return - */ - override def evaluate(buffer: Row): T = { - buffer.getField(maxIndex).asInstanceOf[T] - } - - override def supportPartial: Boolean = true - - override def setAggOffsetInRow(aggOffset: Int): Unit = { - maxIndex = aggOffset - } -} - -class ByteMaxAggregate extends MaxAggregate[Byte] { - - override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO) - -} - -class ShortMaxAggregate extends MaxAggregate[Short] { - - override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO) - -} - -class IntMaxAggregate extends MaxAggregate[Int] { - - override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO) - -} - -class LongMaxAggregate extends MaxAggregate[Long] { - - override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO) - -} - -class FloatMaxAggregate extends MaxAggregate[Float] { - - override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO) - -} - -class DoubleMaxAggregate extends MaxAggregate[Double] { - - override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO) - -} - -class BooleanMaxAggregate extends MaxAggregate[Boolean] { - - override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO) - -} - -class DecimalMaxAggregate extends Aggregate[BigDecimal] { - - protected var minIndex: Int = _ - - override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO) - - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, null) - } - - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - initiate(partial) - } else { - partial.setField(minIndex, value) - } - } - - override def merge(partial: Row, buffer: Row): Unit = { - val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal] - if (partialValue != null) { - val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal] - if (bufferValue != null) { - val min = if (partialValue.compareTo(bufferValue) > 0) partialValue else bufferValue - buffer.setField(minIndex, min) - } else { - buffer.setField(minIndex, partialValue) - } - } - } - - override def evaluate(buffer: Row): BigDecimal = { - buffer.getField(minIndex).asInstanceOf[BigDecimal] - } - - override def supportPartial: Boolean = true - - override def setAggOffsetInRow(aggOffset: Int): Unit = { - minIndex = aggOffset - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala deleted file mode 100644 index 88cb058..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.math.BigDecimal - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.types.Row - -abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { - - protected var minIndex: Int = _ - - /** - * Initiate the intermediate aggregate value in Row. - * - * @param intermediate The intermediate aggregate row to initiate. - */ - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, null) - } - - /** - * Accessed in MapFunction, prepare the input of partial aggregate. - * - * @param value - * @param partial - */ - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - initiate(partial) - } else { - partial.setField(minIndex, value) - } - } - - /** - * Accessed in CombineFunction and GroupReduceFunction, merge partial - * aggregate result into aggregate buffer. - * - * @param partial - * @param buffer - */ - override def merge(partial: Row, buffer: Row): Unit = { - val partialValue = partial.getField(minIndex).asInstanceOf[T] - if (partialValue != null) { - val bufferValue = buffer.getField(minIndex).asInstanceOf[T] - if (bufferValue != null) { - val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue - buffer.setField(minIndex, min) - } else { - buffer.setField(minIndex, partialValue) - } - } - } - - /** - * Return the final aggregated result based on aggregate buffer. - * - * @param buffer - * @return - */ - override def evaluate(buffer: Row): T = { - buffer.getField(minIndex).asInstanceOf[T] - } - - override def supportPartial: Boolean = true - - override def setAggOffsetInRow(aggOffset: Int): Unit = { - minIndex = aggOffset - } -} - -class ByteMinAggregate extends MinAggregate[Byte] { - - override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO) - -} - -class ShortMinAggregate extends MinAggregate[Short] { - - override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO) - -} - -class IntMinAggregate extends MinAggregate[Int] { - - override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO) - -} - -class LongMinAggregate extends MinAggregate[Long] { - - override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO) - -} - -class FloatMinAggregate extends MinAggregate[Float] { - - override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO) - -} - -class DoubleMinAggregate extends MinAggregate[Double] { - - override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO) - -} - -class BooleanMinAggregate extends MinAggregate[Boolean] { - - override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO) - -} - -class DecimalMinAggregate extends Aggregate[BigDecimal] { - - protected var minIndex: Int = _ - - override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO) - - override def initiate(intermediate: Row): Unit = { - intermediate.setField(minIndex, null) - } - - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - initiate(partial) - } else { - partial.setField(minIndex, value) - } - } - - override def merge(partial: Row, buffer: Row): Unit = { - val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal] - if (partialValue != null) { - val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal] - if (bufferValue != null) { - val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue - buffer.setField(minIndex, min) - } else { - buffer.setField(minIndex, partialValue) - } - } - } - - override def evaluate(buffer: Row): BigDecimal = { - buffer.getField(minIndex).asInstanceOf[BigDecimal] - } - - override def supportPartial: Boolean = true - - override def setAggOffsetInRow(aggOffset: Int): Unit = { - minIndex = aggOffset - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala deleted file mode 100644 index cd88112..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.math.BigDecimal - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.types.Row - -abstract class SumAggregate[T: Numeric] - extends Aggregate[T] { - - private val numeric = implicitly[Numeric[T]] - protected var sumIndex: Int = _ - - override def initiate(partial: Row): Unit = { - partial.setField(sumIndex, null) - } - - override def merge(partial1: Row, buffer: Row): Unit = { - val partialValue = partial1.getField(sumIndex).asInstanceOf[T] - if (partialValue != null) { - val bufferValue = buffer.getField(sumIndex).asInstanceOf[T] - if (bufferValue != null) { - buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue)) - } else { - buffer.setField(sumIndex, partialValue) - } - } - } - - override def evaluate(buffer: Row): T = { - buffer.getField(sumIndex).asInstanceOf[T] - } - - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - initiate(partial) - } else { - val input = value.asInstanceOf[T] - partial.setField(sumIndex, input) - } - } - - override def supportPartial: Boolean = true - - override def setAggOffsetInRow(aggOffset: Int): Unit = { - sumIndex = aggOffset - } -} - -class ByteSumAggregate extends SumAggregate[Byte] { - override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO) -} - -class ShortSumAggregate extends SumAggregate[Short] { - override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO) -} - -class IntSumAggregate extends SumAggregate[Int] { - override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO) -} - -class LongSumAggregate extends SumAggregate[Long] { - override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO) -} - -class FloatSumAggregate extends SumAggregate[Float] { - override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO) -} - -class DoubleSumAggregate extends SumAggregate[Double] { - override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO) -} - -class DecimalSumAggregate extends Aggregate[BigDecimal] { - - protected var sumIndex: Int = _ - - override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO) - - override def initiate(partial: Row): Unit = { - partial.setField(sumIndex, null) - } - - override def merge(partial1: Row, buffer: Row): Unit = { - val partialValue = partial1.getField(sumIndex).asInstanceOf[BigDecimal] - if (partialValue != null) { - val bufferValue = buffer.getField(sumIndex).asInstanceOf[BigDecimal] - if (bufferValue != null) { - buffer.setField(sumIndex, partialValue.add(bufferValue)) - } else { - buffer.setField(sumIndex, partialValue) - } - } - } - - override def evaluate(buffer: Row): BigDecimal = { - buffer.getField(sumIndex).asInstanceOf[BigDecimal] - } - - override def prepare(value: Any, partial: Row): Unit = { - if (value == null) { - initiate(partial) - } else { - val input = value.asInstanceOf[BigDecimal] - partial.setField(sumIndex, input) - } - } - - override def supportPartial: Boolean = true - - override def setAggOffsetInRow(aggOffset: Int): Unit = { - sumIndex = aggOffset - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AggregateTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AggregateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AggregateTestBase.scala deleted file mode 100644 index 0ca101d..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AggregateTestBase.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.aggregate - -import java.math.BigDecimal -import org.apache.flink.types.Row - -import org.junit.Test -import org.junit.Assert.assertEquals - -abstract class AggregateTestBase[T] { - - private val offset = 2 - private val rowArity: Int = offset + aggregator.intermediateDataType.length - - def inputValueSets: Seq[Seq[_]] - - def expectedResults: Seq[T] - - def aggregator: Aggregate[T] - - private def createAggregator(): Aggregate[T] = { - val agg = aggregator - agg.setAggOffsetInRow(offset) - agg - } - - private def createRow(): Row = { - new Row(rowArity) - } - - @Test - def testAggregate(): Unit = { - - // iterate over input sets - for((vals, expected) <- inputValueSets.zip(expectedResults)) { - - // prepare mapper - val rows: Seq[Row] = prepare(vals) - - val result = if (aggregator.supportPartial) { - // test with combiner - val (firstVals, secondVals) = rows.splitAt(rows.length / 2) - val combined = partialAgg(firstVals) :: partialAgg(secondVals) :: Nil - finalAgg(combined) - - } else { - // test without combiner - finalAgg(rows) - } - - (expected, result) match { - case (e: BigDecimal, r: BigDecimal) => - // BigDecimal.equals() value and scale but we are only interested in value. - assert(e.compareTo(r) == 0) - case _ => - assertEquals(expected, result) - } - } - } - - private def prepare(vals: Seq[_]): Seq[Row] = { - - val agg = createAggregator() - - vals.map { v => - val row = createRow() - agg.prepare(v, row) - row - } - } - - private def partialAgg(rows: Seq[Row]): Row = { - - val agg = createAggregator() - val aggBuf = createRow() - - agg.initiate(aggBuf) - rows.foreach(v => agg.merge(v, aggBuf)) - - aggBuf - } - - private def finalAgg(rows: Seq[Row]): T = { - - val agg = createAggregator() - val aggBuf = createRow() - - agg.initiate(aggBuf) - rows.foreach(v => agg.merge(v, aggBuf)) - - agg.evaluate(partialAgg(rows)) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AvgAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AvgAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AvgAggregateTest.scala deleted file mode 100644 index a72d08b..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AvgAggregateTest.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.aggregate - -import java.math.BigDecimal - -abstract class AvgAggregateTestBase[T: Numeric] extends AggregateTestBase[T] { - - private val numeric: Numeric[T] = implicitly[Numeric[T]] - - def minVal: T - def maxVal: T - - override def inputValueSets: Seq[Seq[T]] = Seq( - Seq( - minVal, - minVal, - null.asInstanceOf[T], - minVal, - minVal, - null.asInstanceOf[T], - minVal, - minVal, - minVal - ), - Seq( - maxVal, - maxVal, - null.asInstanceOf[T], - maxVal, - maxVal, - null.asInstanceOf[T], - maxVal, - maxVal, - maxVal - ), - Seq( - minVal, - maxVal, - null.asInstanceOf[T], - numeric.fromInt(0), - numeric.negate(maxVal), - numeric.negate(minVal), - null.asInstanceOf[T] - ), - Seq( - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T] - ) - ) - - override def expectedResults: Seq[T] = Seq( - minVal, - maxVal, - numeric.fromInt(0), - null.asInstanceOf[T] - ) -} - -class ByteAvgAggregateTest extends AvgAggregateTestBase[Byte] { - - override def minVal = (Byte.MinValue + 1).toByte - override def maxVal = (Byte.MaxValue - 1).toByte - - override def aggregator = new ByteAvgAggregate() -} - -class ShortAvgAggregateTest extends AvgAggregateTestBase[Short] { - - override def minVal = (Short.MinValue + 1).toShort - override def maxVal = (Short.MaxValue - 1).toShort - - override def aggregator = new ShortAvgAggregate() -} - -class IntAvgAggregateTest extends AvgAggregateTestBase[Int] { - - override def minVal = Int.MinValue + 1 - override def maxVal = Int.MaxValue - 1 - - override def aggregator = new IntAvgAggregate() -} - -class LongAvgAggregateTest extends AvgAggregateTestBase[Long] { - - override def minVal = Long.MinValue + 1 - override def maxVal = Long.MaxValue - 1 - - override def aggregator = new LongAvgAggregate() -} - -class FloatAvgAggregateTest extends AvgAggregateTestBase[Float] { - - override def minVal = Float.MinValue - override def maxVal = Float.MaxValue - - override def aggregator = new FloatAvgAggregate() -} - -class DoubleAvgAggregateTest extends AvgAggregateTestBase[Double] { - - override def minVal = Float.MinValue - override def maxVal = Float.MaxValue - - override def aggregator = new DoubleAvgAggregate() -} - -class DecimalAvgAggregateTest extends AggregateTestBase[BigDecimal] { - - override def inputValueSets: Seq[Seq[_]] = Seq( - Seq( - new BigDecimal("987654321000000"), - new BigDecimal("-0.000000000012345"), - null, - new BigDecimal("0.000000000012345"), - new BigDecimal("-987654321000000"), - null, - new BigDecimal("0") - ), - Seq( - null, - null, - null, - null - ) - ) - - override def expectedResults: Seq[BigDecimal] = Seq( - BigDecimal.ZERO, - null - ) - - override def aggregator: Aggregate[BigDecimal] = new DecimalAvgAggregate() -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/CountAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/CountAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/CountAggregateTest.scala deleted file mode 100644 index 55f73b4..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/CountAggregateTest.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.aggregate - -class CountAggregateTest extends AggregateTestBase[Long] { - - override def inputValueSets: Seq[Seq[_]] = Seq( - Seq("a", "b", null, "c", null, "d", "e", null, "f"), - Seq(null, null, null, null, null, null) - ) - - override def expectedResults: Seq[Long] = Seq(6L, 0L) - - override def aggregator: Aggregate[Long] = new CountAggregate() -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MaxAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MaxAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MaxAggregateTest.scala deleted file mode 100644 index 1bf879d..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MaxAggregateTest.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.aggregate - -import java.math.BigDecimal - -abstract class MaxAggregateTestBase[T: Numeric] extends AggregateTestBase[T] { - - private val numeric: Numeric[T] = implicitly[Numeric[T]] - - def minVal: T - def maxVal: T - - override def inputValueSets: Seq[Seq[T]] = Seq( - Seq( - numeric.fromInt(1), - null.asInstanceOf[T], - maxVal, - numeric.fromInt(-99), - numeric.fromInt(3), - numeric.fromInt(56), - numeric.fromInt(0), - minVal, - numeric.fromInt(-20), - numeric.fromInt(17), - null.asInstanceOf[T] - ), - Seq( - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T] - ) - ) - - override def expectedResults: Seq[T] = Seq( - maxVal, - null.asInstanceOf[T] - ) -} - -class ByteMaxAggregateTest extends MaxAggregateTestBase[Byte] { - - override def minVal = (Byte.MinValue + 1).toByte - override def maxVal = (Byte.MaxValue - 1).toByte - - override def aggregator: Aggregate[Byte] = new ByteMaxAggregate() -} - -class ShortMaxAggregateTest extends MaxAggregateTestBase[Short] { - - override def minVal = (Short.MinValue + 1).toShort - override def maxVal = (Short.MaxValue - 1).toShort - - override def aggregator: Aggregate[Short] = new ShortMaxAggregate() -} - -class IntMaxAggregateTest extends MaxAggregateTestBase[Int] { - - override def minVal = Int.MinValue + 1 - override def maxVal = Int.MaxValue - 1 - - override def aggregator: Aggregate[Int] = new IntMaxAggregate() -} - -class LongMaxAggregateTest extends MaxAggregateTestBase[Long] { - - override def minVal = Long.MinValue + 1 - override def maxVal = Long.MaxValue - 1 - - override def aggregator: Aggregate[Long] = new LongMaxAggregate() -} - -class FloatMaxAggregateTest extends MaxAggregateTestBase[Float] { - - override def minVal = Float.MinValue / 2 - override def maxVal = Float.MaxValue / 2 - - override def aggregator: Aggregate[Float] = new FloatMaxAggregate() -} - -class DoubleMaxAggregateTest extends MaxAggregateTestBase[Double] { - - override def minVal = Double.MinValue / 2 - override def maxVal = Double.MaxValue / 2 - - override def aggregator: Aggregate[Double] = new DoubleMaxAggregate() -} - -class BooleanMaxAggregateTest extends AggregateTestBase[Boolean] { - - override def inputValueSets: Seq[Seq[Boolean]] = Seq( - Seq( - false, - false, - false - ), - Seq( - true, - true, - true - ), - Seq( - true, - false, - null.asInstanceOf[Boolean], - true, - false, - true, - null.asInstanceOf[Boolean] - ), - Seq( - null.asInstanceOf[Boolean], - null.asInstanceOf[Boolean], - null.asInstanceOf[Boolean] - ) - ) - - override def expectedResults: Seq[Boolean] = Seq( - false, - true, - true, - null.asInstanceOf[Boolean] - ) - - override def aggregator: Aggregate[Boolean] = new BooleanMaxAggregate() -} - -class DecimalMaxAggregateTest extends AggregateTestBase[BigDecimal] { - - override def inputValueSets: Seq[Seq[_]] = Seq( - Seq( - new BigDecimal("1"), - new BigDecimal("1000.000001"), - new BigDecimal("-1"), - new BigDecimal("-999.998999"), - null, - new BigDecimal("0"), - new BigDecimal("-999.999"), - null, - new BigDecimal("999.999") - ), - Seq( - null, - null, - null, - null, - null - ) - ) - - override def expectedResults: Seq[BigDecimal] = Seq( - new BigDecimal("1000.000001"), - null - ) - - override def aggregator: Aggregate[BigDecimal] = new DecimalMaxAggregate() - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MinAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MinAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MinAggregateTest.scala deleted file mode 100644 index 3e2404d..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MinAggregateTest.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.aggregate - -import java.math.BigDecimal - -abstract class MinAggregateTestBase[T: Numeric] extends AggregateTestBase[T] { - - private val numeric: Numeric[T] = implicitly[Numeric[T]] - - def minVal: T - def maxVal: T - - override def inputValueSets: Seq[Seq[T]] = Seq( - Seq( - numeric.fromInt(1), - null.asInstanceOf[T], - maxVal, - numeric.fromInt(-99), - numeric.fromInt(3), - numeric.fromInt(56), - numeric.fromInt(0), - minVal, - numeric.fromInt(-20), - numeric.fromInt(17), - null.asInstanceOf[T] - ), - Seq( - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T] - ) - ) - - override def expectedResults: Seq[T] = Seq( - minVal, - null.asInstanceOf[T] - ) -} - -class ByteMinAggregateTest extends MinAggregateTestBase[Byte] { - - override def minVal = (Byte.MinValue + 1).toByte - override def maxVal = (Byte.MaxValue - 1).toByte - - override def aggregator: Aggregate[Byte] = new ByteMinAggregate() -} - -class ShortMinAggregateTest extends MinAggregateTestBase[Short] { - - override def minVal = (Short.MinValue + 1).toShort - override def maxVal = (Short.MaxValue - 1).toShort - - override def aggregator: Aggregate[Short] = new ShortMinAggregate() -} - -class IntMinAggregateTest extends MinAggregateTestBase[Int] { - - override def minVal = Int.MinValue + 1 - override def maxVal = Int.MaxValue - 1 - - override def aggregator: Aggregate[Int] = new IntMinAggregate() -} - -class LongMinAggregateTest extends MinAggregateTestBase[Long] { - - override def minVal = Long.MinValue + 1 - override def maxVal = Long.MaxValue - 1 - - override def aggregator: Aggregate[Long] = new LongMinAggregate() -} - -class FloatMinAggregateTest extends MinAggregateTestBase[Float] { - - override def minVal = Float.MinValue / 2 - override def maxVal = Float.MaxValue / 2 - - override def aggregator: Aggregate[Float] = new FloatMinAggregate() -} - -class DoubleMinAggregateTest extends MinAggregateTestBase[Double] { - - override def minVal = Double.MinValue / 2 - override def maxVal = Double.MaxValue / 2 - - override def aggregator: Aggregate[Double] = new DoubleMinAggregate() -} - -class BooleanMinAggregateTest extends AggregateTestBase[Boolean] { - - override def inputValueSets: Seq[Seq[Boolean]] = Seq( - Seq( - false, - false, - false - ), - Seq( - true, - true, - true - ), - Seq( - true, - false, - null.asInstanceOf[Boolean], - true, - false, - true, - null.asInstanceOf[Boolean] - ), - Seq( - null.asInstanceOf[Boolean], - null.asInstanceOf[Boolean], - null.asInstanceOf[Boolean] - ) - ) - - override def expectedResults: Seq[Boolean] = Seq( - false, - true, - false, - null.asInstanceOf[Boolean] - ) - - override def aggregator: Aggregate[Boolean] = new BooleanMinAggregate() -} - -class DecimalMinAggregateTest extends AggregateTestBase[BigDecimal] { - - override def inputValueSets: Seq[Seq[_]] = Seq( - Seq( - new BigDecimal("1"), - new BigDecimal("1000"), - new BigDecimal("-1"), - new BigDecimal("-999.998999"), - null, - new BigDecimal("0"), - new BigDecimal("-999.999"), - null, - new BigDecimal("999.999") - ), - Seq( - null, - null, - null, - null, - null - ) - ) - - override def expectedResults: Seq[BigDecimal] = Seq( - new BigDecimal("-999.999"), - null - ) - - override def aggregator: Aggregate[BigDecimal] = new DecimalMinAggregate() - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/SumAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/SumAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/SumAggregateTest.scala deleted file mode 100644 index c085334..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/SumAggregateTest.scala +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.aggregate - -import java.math.BigDecimal - -abstract class SumAggregateTestBase[T: Numeric] extends AggregateTestBase[T] { - - private val numeric: Numeric[T] = implicitly[Numeric[T]] - - def maxVal: T - private val minVal = numeric.negate(maxVal) - - override def inputValueSets: Seq[Seq[T]] = Seq( - Seq( - minVal, - numeric.fromInt(1), - null.asInstanceOf[T], - numeric.fromInt(2), - numeric.fromInt(3), - numeric.fromInt(4), - numeric.fromInt(5), - numeric.fromInt(-10), - numeric.fromInt(-20), - numeric.fromInt(17), - null.asInstanceOf[T], - maxVal - ), - Seq( - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T], - null.asInstanceOf[T] - ) - ) - - override def expectedResults: Seq[T] = Seq( - numeric.fromInt(2), - null.asInstanceOf[T] - ) -} - -class ByteSumAggregateTest extends SumAggregateTestBase[Byte] { - - override def maxVal = (Byte.MaxValue / 2).toByte - - override def aggregator: Aggregate[Byte] = new ByteSumAggregate -} - -class ShortSumAggregateTest extends SumAggregateTestBase[Short] { - - override def maxVal = (Short.MaxValue / 2).toShort - - override def aggregator: Aggregate[Short] = new ShortSumAggregate -} - -class IntSumAggregateTest extends SumAggregateTestBase[Int] { - - override def maxVal = Int.MaxValue / 2 - - override def aggregator: Aggregate[Int] = new IntSumAggregate -} - -class LongSumAggregateTest extends SumAggregateTestBase[Long] { - - override def maxVal = Long.MaxValue / 2 - - override def aggregator: Aggregate[Long] = new LongSumAggregate -} - -class FloatSumAggregateTest extends SumAggregateTestBase[Float] { - - override def maxVal = 12345.6789f - - override def aggregator: Aggregate[Float] = new FloatSumAggregate -} - -class DoubleSumAggregateTest extends SumAggregateTestBase[Double] { - - override def maxVal = 12345.6789d - - override def aggregator: Aggregate[Double] = new DoubleSumAggregate -} - -class DecimalSumAggregateTest extends AggregateTestBase[BigDecimal] { - - override def inputValueSets: Seq[Seq[_]] = Seq( - Seq( - new BigDecimal("1"), - new BigDecimal("2"), - new BigDecimal("3"), - null, - new BigDecimal("0"), - new BigDecimal("-1000"), - new BigDecimal("0.000000000002"), - new BigDecimal("1000"), - new BigDecimal("-0.000000000001"), - new BigDecimal("999.999"), - null, - new BigDecimal("4"), - new BigDecimal("-999.999"), - null - ), - Seq( - null, - null, - null, - null, - null - ) - ) - - override def expectedResults: Seq[BigDecimal] = Seq( - new BigDecimal("10.000000000001"), - null - ) - - override def aggregator: Aggregate[BigDecimal] = new DecimalSumAggregate() -}
