[
https://issues.apache.org/jira/browse/FLINK-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891305#comment-15891305
]
ASF GitHub Bot commented on FLINK-5768:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3423#discussion_r103806213
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -737,101 +632,121 @@ object AggregateUtil {
aggregates(index) = if (sqlMinMaxFunction.getKind ==
SqlKind.MIN) {
sqlTypeName match {
case TINYINT =>
- new ByteMinAggregate
+ new ByteMinAggFunction
case SMALLINT =>
- new ShortMinAggregate
+ new ShortMinAggFunction
case INTEGER =>
- new IntMinAggregate
+ new IntMinAggFunction
case BIGINT =>
- new LongMinAggregate
+ new LongMinAggFunction
case FLOAT =>
- new FloatMinAggregate
+ new FloatMinAggFunction
case DOUBLE =>
- new DoubleMinAggregate
+ new DoubleMinAggFunction
case DECIMAL =>
- new DecimalMinAggregate
+ new DecimalMinAggFunction
case BOOLEAN =>
- new BooleanMinAggregate
+ new BooleanMinAggFunction
case sqlType: SqlTypeName =>
throw new TableException("Min aggregate does no support
type:" + sqlType)
}
} else {
sqlTypeName match {
case TINYINT =>
- new ByteMaxAggregate
+ new ByteMaxAggFunction
case SMALLINT =>
- new ShortMaxAggregate
+ new ShortMaxAggFunction
case INTEGER =>
- new IntMaxAggregate
+ new IntMaxAggFunction
case BIGINT =>
- new LongMaxAggregate
+ new LongMaxAggFunction
case FLOAT =>
- new FloatMaxAggregate
+ new FloatMaxAggFunction
case DOUBLE =>
- new DoubleMaxAggregate
+ new DoubleMaxAggFunction
case DECIMAL =>
- new DecimalMaxAggregate
+ new DecimalMaxAggFunction
case BOOLEAN =>
- new BooleanMaxAggregate
+ new BooleanMaxAggFunction
case sqlType: SqlTypeName =>
throw new TableException("Max aggregate does no support
type:" + sqlType)
}
}
}
case _: SqlCountAggFunction =>
- aggregates(index) = new CountAggregate
+ aggregates(index) = new CountAggFunction
case unSupported: SqlAggFunction =>
throw new TableException("unsupported Function: " +
unSupported.getName)
}
- setAggregateDataOffset(index)
- }
-
- // set the aggregate intermediate data start index in Row, and update
current value.
- def setAggregateDataOffset(index: Int): Unit = {
- aggregates(index).setAggOffsetInRow(aggOffset)
- aggOffset += aggregates(index).intermediateDataType.length
}
(aggFieldIndexes, aggregates)
}
- private def createAggregateBufferDataType(
- groupings: Array[Int],
- aggregates: Array[Aggregate[_]],
- inputType: RelDataType,
- windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo
= {
+ private def createDataSetAggregateBufferDataType(
+ groupings: Array[Int],
+ aggregates: Array[TableAggregateFunction[_]],
+ inputType: RelDataType,
+ windowKeyTypes: Option[Array[TypeInformation[_]]] = None):
RowTypeInfo = {
// get the field data types of group keys.
- val groupingTypes: Seq[TypeInformation[_]] = groupings
- .map(inputType.getFieldList.get(_).getType)
- .map(FlinkTypeFactory.toTypeInfo)
+ val groupingTypes: Seq[TypeInformation[_]] =
+ groupings
+ .map(inputType.getFieldList.get(_).getType)
+ .map(FlinkTypeFactory.toTypeInfo)
// get all field data types of all intermediate aggregates
- val aggTypes: Seq[TypeInformation[_]] =
aggregates.flatMap(_.intermediateDataType)
+ val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
+ val clazz: Class[_] = agg.getClass
--- End diff --
We need to obtain the `TypeInformation` of the `Accumulator` here, not the
type of the `AggregateFunction`.
We might need to add a `getAccumulatorType()` method to the
`AggregateFunction` if we cannot extract the type from the object returned by
`AggregateFunction.createAccumulator()`.
> Apply new aggregation functions for datastream and dataset tables
> -----------------------------------------------------------------
>
> Key: FLINK-5768
> URL: https://issues.apache.org/jira/browse/FLINK-5768
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
> Apply new aggregation functions for datastream and dataset tables
> This includes:
> 1. Change the implementation of the DataStream aggregation runtime code to
> use new aggregation functions and aggregate dataStream API.
> 2. DataStream will be always running in incremental mode, as explained in
> 06/Feb/2017 in FLINK5564.
> 2. Change the implementation of the Dataset aggregation runtime code to use
> new aggregation functions.
> 3. Clean up unused class and method.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)