Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2562#discussion_r81847736
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
    @@ -675,11 +692,77 @@ class GroupedTable(
         * Example:
         *
         * {{{
    -    *   tab.groupBy("key").select("key, value.avg + " The average" as 
average")
    +    *   tab.groupBy("key").select("key, value.avg + ' The average' as 
average")
    +    * }}}
    +    */
    +  def select(fields: String): Table = {
    +    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    +    select(fieldExprs: _*)
    +  }
    +
    +  /**
    +    * Windows a table to divide a (potentially) infinite stream of records 
into finite slices
    +    * based on the timestamps of elements or other criteria. This division 
is required when
    +    * working with infinite data and performing transformations that 
aggregate elements.
    +    *
    +    * @param groupWindow group-window specification required to bound the 
infinite input stream
    +    *                    into a finite group
    +    * @return group-windowed table
    +    */
    +  def window(groupWindow: GroupWindow): GroupWindowedTable = {
    +    if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
    +      throw new ValidationException(s"Windows on batch tables are 
currently not supported.")
    +    }
    +    new GroupWindowedTable(table, groupKey, groupWindow)
    +  }
    +}
    +
    +class GroupWindowedTable(
    +    private[flink] val table: Table,
    +    private[flink] val groupKey: Seq[Expression],
    +    private[flink] val window: GroupWindow) {
    +
    +  /**
    +    * Performs a selection operation on a group-windowed table. Similar to 
an SQL SELECT statement.
    +    * The field expressions can contain complex expressions and 
aggregations.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   groupWindowTable.select('key, 'window.start, 'value.avg + " The 
average" as 'average)
    +    * }}}
    +    */
    +  def select(fields: Expression*): Table = {
    +    val projectionOnAggregates = fields.map(extractAggregations(_, 
table.tableEnv))
    +    val aggregations = projectionOnAggregates.flatMap(_._2)
    +
    +    val groupWindow = window.toLogicalWindow
    +
    +    val logical = if (aggregations.nonEmpty) {
    +      Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
    +        WindowAggregate(groupKey, groupWindow, aggregations, 
table.logicalPlan)
    +          .validate(table.tableEnv))
    +    } else {
    +      Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
    +        WindowAggregate(groupKey, groupWindow, Nil, 
table.logicalPlan).validate(table.tableEnv))
    +    }
    +
    +    new Table(table.tableEnv, logical.validate(table.tableEnv))
    --- End diff --
    
    Wasn't `logical` validated before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to