Github user shaoxuan-wang commented on a diff in the pull request:
https://github.com/apache/flink/pull/3423#discussion_r103885054
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -737,101 +632,121 @@ object AggregateUtil {
aggregates(index) = if (sqlMinMaxFunction.getKind ==
SqlKind.MIN) {
sqlTypeName match {
case TINYINT =>
- new ByteMinAggregate
+ new ByteMinAggFunction
case SMALLINT =>
- new ShortMinAggregate
+ new ShortMinAggFunction
case INTEGER =>
- new IntMinAggregate
+ new IntMinAggFunction
case BIGINT =>
- new LongMinAggregate
+ new LongMinAggFunction
case FLOAT =>
- new FloatMinAggregate
+ new FloatMinAggFunction
case DOUBLE =>
- new DoubleMinAggregate
+ new DoubleMinAggFunction
case DECIMAL =>
- new DecimalMinAggregate
+ new DecimalMinAggFunction
case BOOLEAN =>
- new BooleanMinAggregate
+ new BooleanMinAggFunction
case sqlType: SqlTypeName =>
throw new TableException("Min aggregate does no support
type:" + sqlType)
}
} else {
sqlTypeName match {
case TINYINT =>
- new ByteMaxAggregate
+ new ByteMaxAggFunction
case SMALLINT =>
- new ShortMaxAggregate
+ new ShortMaxAggFunction
case INTEGER =>
- new IntMaxAggregate
+ new IntMaxAggFunction
case BIGINT =>
- new LongMaxAggregate
+ new LongMaxAggFunction
case FLOAT =>
- new FloatMaxAggregate
+ new FloatMaxAggFunction
case DOUBLE =>
- new DoubleMaxAggregate
+ new DoubleMaxAggFunction
case DECIMAL =>
- new DecimalMaxAggregate
+ new DecimalMaxAggFunction
case BOOLEAN =>
- new BooleanMaxAggregate
+ new BooleanMaxAggFunction
case sqlType: SqlTypeName =>
throw new TableException("Max aggregate does no support
type:" + sqlType)
}
}
}
case _: SqlCountAggFunction =>
- aggregates(index) = new CountAggregate
+ aggregates(index) = new CountAggFunction
case unSupported: SqlAggFunction =>
throw new TableException("unsupported Function: " +
unSupported.getName)
}
- setAggregateDataOffset(index)
- }
-
- // set the aggregate intermediate data start index in Row, and update
current value.
- def setAggregateDataOffset(index: Int): Unit = {
- aggregates(index).setAggOffsetInRow(aggOffset)
- aggOffset += aggregates(index).intermediateDataType.length
}
(aggFieldIndexes, aggregates)
}
- private def createAggregateBufferDataType(
- groupings: Array[Int],
- aggregates: Array[Aggregate[_]],
- inputType: RelDataType,
- windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo
= {
+ private def createDataSetAggregateBufferDataType(
+ groupings: Array[Int],
+ aggregates: Array[TableAggregateFunction[_]],
+ inputType: RelDataType,
+ windowKeyTypes: Option[Array[TypeInformation[_]]] = None):
RowTypeInfo = {
// get the field data types of group keys.
- val groupingTypes: Seq[TypeInformation[_]] = groupings
- .map(inputType.getFieldList.get(_).getType)
- .map(FlinkTypeFactory.toTypeInfo)
+ val groupingTypes: Seq[TypeInformation[_]] =
+ groupings
+ .map(inputType.getFieldList.get(_).getType)
+ .map(FlinkTypeFactory.toTypeInfo)
// get all field data types of all intermediate aggregates
- val aggTypes: Seq[TypeInformation[_]] =
aggregates.flatMap(_.intermediateDataType)
+ val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
+ val clazz: Class[_] = agg.getClass
--- End diff --
Btw, @fhueske, I have not gotten a chance to dig into yet. But are you in
case already aware of why we cannot get the correct scala type by
TypeInformation? If we cannot fix this , I think we should consider to
implement all built-in aggregate and aggregateTestBase in Java, and also
recommend future UDAGG user to write aggregate in java but not in scala.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---