[
https://issues.apache.org/jira/browse/FLINK-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891310#comment-15891310
]
ASF GitHub Bot commented on FLINK-5768:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3423#discussion_r103818316
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -737,101 +632,121 @@ object AggregateUtil {
aggregates(index) = if (sqlMinMaxFunction.getKind ==
SqlKind.MIN) {
sqlTypeName match {
case TINYINT =>
- new ByteMinAggregate
+ new ByteMinAggFunction
case SMALLINT =>
- new ShortMinAggregate
+ new ShortMinAggFunction
case INTEGER =>
- new IntMinAggregate
+ new IntMinAggFunction
case BIGINT =>
- new LongMinAggregate
+ new LongMinAggFunction
case FLOAT =>
- new FloatMinAggregate
+ new FloatMinAggFunction
case DOUBLE =>
- new DoubleMinAggregate
+ new DoubleMinAggFunction
case DECIMAL =>
- new DecimalMinAggregate
+ new DecimalMinAggFunction
case BOOLEAN =>
- new BooleanMinAggregate
+ new BooleanMinAggFunction
case sqlType: SqlTypeName =>
throw new TableException("Min aggregate does no support
type:" + sqlType)
}
} else {
sqlTypeName match {
case TINYINT =>
- new ByteMaxAggregate
+ new ByteMaxAggFunction
case SMALLINT =>
- new ShortMaxAggregate
+ new ShortMaxAggFunction
case INTEGER =>
- new IntMaxAggregate
+ new IntMaxAggFunction
case BIGINT =>
- new LongMaxAggregate
+ new LongMaxAggFunction
case FLOAT =>
- new FloatMaxAggregate
+ new FloatMaxAggFunction
case DOUBLE =>
- new DoubleMaxAggregate
+ new DoubleMaxAggFunction
case DECIMAL =>
- new DecimalMaxAggregate
+ new DecimalMaxAggFunction
case BOOLEAN =>
- new BooleanMaxAggregate
+ new BooleanMaxAggFunction
case sqlType: SqlTypeName =>
throw new TableException("Max aggregate does no support
type:" + sqlType)
}
}
}
case _: SqlCountAggFunction =>
- aggregates(index) = new CountAggregate
+ aggregates(index) = new CountAggFunction
case unSupported: SqlAggFunction =>
throw new TableException("unsupported Function: " +
unSupported.getName)
}
- setAggregateDataOffset(index)
- }
-
- // set the aggregate intermediate data start index in Row, and update
current value.
- def setAggregateDataOffset(index: Int): Unit = {
- aggregates(index).setAggOffsetInRow(aggOffset)
- aggOffset += aggregates(index).intermediateDataType.length
}
(aggFieldIndexes, aggregates)
}
- private def createAggregateBufferDataType(
- groupings: Array[Int],
- aggregates: Array[Aggregate[_]],
- inputType: RelDataType,
- windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo
= {
+ private def createDataSetAggregateBufferDataType(
+ groupings: Array[Int],
+ aggregates: Array[TableAggregateFunction[_]],
+ inputType: RelDataType,
+ windowKeyTypes: Option[Array[TypeInformation[_]]] = None):
RowTypeInfo = {
// get the field data types of group keys.
- val groupingTypes: Seq[TypeInformation[_]] = groupings
- .map(inputType.getFieldList.get(_).getType)
- .map(FlinkTypeFactory.toTypeInfo)
+ val groupingTypes: Seq[TypeInformation[_]] =
+ groupings
+ .map(inputType.getFieldList.get(_).getType)
+ .map(FlinkTypeFactory.toTypeInfo)
// get all field data types of all intermediate aggregates
- val aggTypes: Seq[TypeInformation[_]] =
aggregates.flatMap(_.intermediateDataType)
+ val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
+ val clazz: Class[_] = agg.getClass
--- End diff --
I played around with this.
I think we can make it work without a `getAccumulatorType()` method but
have to change a few parts.
First of all, the accumulators need to be moved out of the
`AggregationFunction` and become regular "top-level" classes. Once this is
done, `TypeInformation.of(clazz)` should detect them as tuples and create a
`TupleTypeInfo`. However, the fields inside are still of `GenericType`. I
figured out that it helps to use the Java boxed types instead of Scala's types,
i.e., `a JTuple1[JLong]` (with `JLong = java.lang.Long) will result in a
correct TupleTypeInfo with a single Long field.
> Apply new aggregation functions for datastream and dataset tables
> -----------------------------------------------------------------
>
> Key: FLINK-5768
> URL: https://issues.apache.org/jira/browse/FLINK-5768
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
> Apply new aggregation functions for datastream and dataset tables
> This includes:
> 1. Change the implementation of the DataStream aggregation runtime code to
> use new aggregation functions and aggregate dataStream API.
> 2. DataStream will be always running in incremental mode, as explained in
> 06/Feb/2017 in FLINK5564.
> 2. Change the implementation of the Dataset aggregation runtime code to use
> new aggregation functions.
> 3. Clean up unused class and method.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)