Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3423#discussion_r103818316
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -737,101 +632,121 @@ object AggregateUtil {
aggregates(index) = if (sqlMinMaxFunction.getKind ==
SqlKind.MIN) {
sqlTypeName match {
case TINYINT =>
- new ByteMinAggregate
+ new ByteMinAggFunction
case SMALLINT =>
- new ShortMinAggregate
+ new ShortMinAggFunction
case INTEGER =>
- new IntMinAggregate
+ new IntMinAggFunction
case BIGINT =>
- new LongMinAggregate
+ new LongMinAggFunction
case FLOAT =>
- new FloatMinAggregate
+ new FloatMinAggFunction
case DOUBLE =>
- new DoubleMinAggregate
+ new DoubleMinAggFunction
case DECIMAL =>
- new DecimalMinAggregate
+ new DecimalMinAggFunction
case BOOLEAN =>
- new BooleanMinAggregate
+ new BooleanMinAggFunction
case sqlType: SqlTypeName =>
throw new TableException("Min aggregate does no support
type:" + sqlType)
}
} else {
sqlTypeName match {
case TINYINT =>
- new ByteMaxAggregate
+ new ByteMaxAggFunction
case SMALLINT =>
- new ShortMaxAggregate
+ new ShortMaxAggFunction
case INTEGER =>
- new IntMaxAggregate
+ new IntMaxAggFunction
case BIGINT =>
- new LongMaxAggregate
+ new LongMaxAggFunction
case FLOAT =>
- new FloatMaxAggregate
+ new FloatMaxAggFunction
case DOUBLE =>
- new DoubleMaxAggregate
+ new DoubleMaxAggFunction
case DECIMAL =>
- new DecimalMaxAggregate
+ new DecimalMaxAggFunction
case BOOLEAN =>
- new BooleanMaxAggregate
+ new BooleanMaxAggFunction
case sqlType: SqlTypeName =>
throw new TableException("Max aggregate does no support
type:" + sqlType)
}
}
}
case _: SqlCountAggFunction =>
- aggregates(index) = new CountAggregate
+ aggregates(index) = new CountAggFunction
case unSupported: SqlAggFunction =>
throw new TableException("unsupported Function: " +
unSupported.getName)
}
- setAggregateDataOffset(index)
- }
-
- // set the aggregate intermediate data start index in Row, and update
current value.
- def setAggregateDataOffset(index: Int): Unit = {
- aggregates(index).setAggOffsetInRow(aggOffset)
- aggOffset += aggregates(index).intermediateDataType.length
}
(aggFieldIndexes, aggregates)
}
- private def createAggregateBufferDataType(
- groupings: Array[Int],
- aggregates: Array[Aggregate[_]],
- inputType: RelDataType,
- windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo
= {
+ private def createDataSetAggregateBufferDataType(
+ groupings: Array[Int],
+ aggregates: Array[TableAggregateFunction[_]],
+ inputType: RelDataType,
+ windowKeyTypes: Option[Array[TypeInformation[_]]] = None):
RowTypeInfo = {
// get the field data types of group keys.
- val groupingTypes: Seq[TypeInformation[_]] = groupings
- .map(inputType.getFieldList.get(_).getType)
- .map(FlinkTypeFactory.toTypeInfo)
+ val groupingTypes: Seq[TypeInformation[_]] =
+ groupings
+ .map(inputType.getFieldList.get(_).getType)
+ .map(FlinkTypeFactory.toTypeInfo)
// get all field data types of all intermediate aggregates
- val aggTypes: Seq[TypeInformation[_]] =
aggregates.flatMap(_.intermediateDataType)
+ val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
+ val clazz: Class[_] = agg.getClass
--- End diff --
I played around with this.
I think we can make it work without a `getAccumulatorType()` method but
have to change a few parts.
First of all, the accumulators need to be moved out of the
`AggregationFunction` and become regular "top-level" classes. Once this is
done, `TypeInformation.of(clazz)` should detect them as tuples and create a
`TupleTypeInfo`. However, the fields inside are still of `GenericType`. I
figured out that it helps to use the Java boxed types instead of Scala's types,
i.e., `a JTuple1[JLong]` (with `JLong = java.lang.Long) will result in a
correct TupleTypeInfo with a single Long field.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---