Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2562#discussion_r81847736
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
---
@@ -675,11 +692,77 @@ class GroupedTable(
* Example:
*
* {{{
- * tab.groupBy("key").select("key, value.avg + " The average" as
average")
+ * tab.groupBy("key").select("key, value.avg + ' The average' as
average")
+ * }}}
+ */
+ def select(fields: String): Table = {
+ val fieldExprs = ExpressionParser.parseExpressionList(fields)
+ select(fieldExprs: _*)
+ }
+
+ /**
+ * Windows a table to divide a (potentially) infinite stream of records
into finite slices
+ * based on the timestamps of elements or other criteria. This division
is required when
+ * working with infinite data and performing transformations that
aggregate elements.
+ *
+ * @param groupWindow group-window specification required to bound the
infinite input stream
+ * into a finite group
+ * @return group-windowed table
+ */
+ def window(groupWindow: GroupWindow): GroupWindowedTable = {
+ if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
+ throw new ValidationException(s"Windows on batch tables are
currently not supported.")
+ }
+ new GroupWindowedTable(table, groupKey, groupWindow)
+ }
+}
+
+class GroupWindowedTable(
+ private[flink] val table: Table,
+ private[flink] val groupKey: Seq[Expression],
+ private[flink] val window: GroupWindow) {
+
+ /**
+ * Performs a selection operation on a group-windowed table. Similar to
an SQL SELECT statement.
+ * The field expressions can contain complex expressions and
aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * groupWindowTable.select('key, 'window.start, 'value.avg + " The
average" as 'average)
+ * }}}
+ */
+ def select(fields: Expression*): Table = {
+ val projectionOnAggregates = fields.map(extractAggregations(_,
table.tableEnv))
+ val aggregations = projectionOnAggregates.flatMap(_._2)
+
+ val groupWindow = window.toLogicalWindow
+
+ val logical = if (aggregations.nonEmpty) {
+ Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+ WindowAggregate(groupKey, groupWindow, aggregations,
table.logicalPlan)
+ .validate(table.tableEnv))
+ } else {
+ Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+ WindowAggregate(groupKey, groupWindow, Nil,
table.logicalPlan).validate(table.tableEnv))
+ }
+
+ new Table(table.tableEnv, logical.validate(table.tableEnv))
--- End diff --
Wasn't `logical` validated before?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---