[
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857514#comment-15857514
]
ASF GitHub Bot commented on FLINK-5219:
---------------------------------------
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3266#discussion_r99997022
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -306,6 +307,85 @@ object AggregateUtil {
}
/**
+ * Create a
[[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
+ * for aggregates.
+ * The function returns aggregate values of all aggregate function
which are
+ * organized by the following format:
+ *
+ * {{{
+ * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5
+ * | |
windowEnd(max(rowtime)
+ * | | |
+ * v v v
+ * +--------+--------+--------+--------+-----------+---------+
+ * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd|
+ * +--------+--------+--------+--------+-----------+---------+
+ * ^ ^
+ * | |
+ * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime))
+ *
+ * }}}
+ *
+ */
+ def createDataSetWindowAggregationMapPartitionFunction(
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType = null,
+ properties: Seq[NamedWindowProperty] = null,
+ isPreMapPartition: Boolean = true,
+ isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
+
+ val aggregates = transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ 0)._2
+
+ val intermediateRowArity =
aggregates.map(_.intermediateDataType.length).sum
+
+ window match {
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ if (isPreMapPartition) {
+ val preMapReturnType: RowTypeInfo =
+ createAggregateBufferDataType(
+ Array(),
+ aggregates,
+ inputType,
+ Option(Array(BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO)))
+
+ new DataSetSessionWindowAggregatePreProcessor(
+ aggregates,
+ Array(),
+ // the addition two fields are used to store window-start and
window-end attributes
+ intermediateRowArity + 2,
+ asLong(gap),
+ preMapReturnType).asInstanceOf[MapPartitionFunction[Row, Row]]
--- End diff --
Can we leave out the `asInstanceOf` ?
> Add non-grouped session windows for batch tables
> ------------------------------------------------
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)