[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857514#comment-15857514
 ] 

ASF GitHub Bot commented on FLINK-5219:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3266#discussion_r99997022
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -306,6 +307,85 @@ object AggregateUtil {
       }
     
       /**
    +    * Create a 
[[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
    +    * for aggregates.
    +    * The function returns aggregate values of all aggregate function 
which are
    +    * organized by the following format:
    +    *
    +    * {{{
    +    *       avg(x) aggOffsetInRow = 2  count(z) aggOffsetInRow = 5
    +    *           |                          |          
windowEnd(max(rowtime)
    +    *           |                          |                   |
    +    *           v                          v                   v
    +    *        +--------+--------+--------+--------+-----------+---------+
    +    *        |  sum1  | count1 |  sum2  | count2 |windowStart|windowEnd|
    +    *        +--------+--------+--------+--------+-----------+---------+
    +    *                               ^                 ^
    +    *                               |                 |
    +    *             sum(y) aggOffsetInRow = 4    windowStart(min(rowtime))
    +    *
    +    * }}}
    +    *
    +    */
    +  def createDataSetWindowAggregationMapPartitionFunction(
    +    window: LogicalWindow,
    +    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +    inputType: RelDataType,
    +    outputType: RelDataType = null,
    +    properties: Seq[NamedWindowProperty] = null,
    +    isPreMapPartition: Boolean = true,
    +    isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
    +
    +    val aggregates = transformToAggregateFunctions(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      0)._2
    +
    +    val intermediateRowArity = 
aggregates.map(_.intermediateDataType.length).sum
    +
    +    window match {
    +      case EventTimeSessionGroupWindow(_, _, gap) =>
    +        if (isPreMapPartition) {
    +          val preMapReturnType: RowTypeInfo =
    +            createAggregateBufferDataType(
    +              Array(),
    +              aggregates,
    +              inputType,
    +              Option(Array(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO)))
    +
    +          new DataSetSessionWindowAggregatePreProcessor(
    +            aggregates,
    +            Array(),
    +            // the addition two fields are used to store window-start and 
window-end attributes
    +            intermediateRowArity + 2,
    +            asLong(gap),
    +            preMapReturnType).asInstanceOf[MapPartitionFunction[Row, Row]]
    --- End diff --
    
    Can we leave out the `asInstanceOf` ? 


> Add non-grouped session windows for batch tables
> ------------------------------------------------
>
>                 Key: FLINK-5219
>                 URL: https://issues.apache.org/jira/browse/FLINK-5219
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to