[
https://issues.apache.org/jira/browse/FLINK-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834598#comment-15834598
]
ASF GitHub Bot commented on FLINK-2980:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/3026#discussion_r97311604
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
---
@@ -928,5 +1032,151 @@ class GroupWindowedTable(
val fieldExprs = ExpressionParser.parseExpressionList(fields)
select(fieldExprs: _*)
}
+}
+
+/**
+ * A table that has been grouped on several sets of grouping keys.
+ */
+class GroupingSetsTable(
+ private[flink] val table: Table,
+ private[flink] val groups: Seq[Seq[Expression]],
+ private[flink] val sqlKind: SqlKind) {
+
+ /**
+ * Performs a selection operation on a grouped table. Similar to an SQL
SELECT statement.
+ * The field expressions can contain complex expressions and
aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.groupingSets('key).select('key, 'value.avg + " The average" as
'average)
+ * }}}
+ */
+ def select(fields: Expression*): Table = {
+
+ val (aggNames, propNames) = extractAggregationsAndProperties(fields,
table.tableEnv)
+
+ if (propNames.nonEmpty) {
+ throw ValidationException("Window properties can only be used on
windowed tables.")
+ }
+
+ val groupingSets = sqlKind match {
+ case SqlKind.CUBE => ExpressionUtils.cube(groups)
+ case SqlKind.ROLLUP => ExpressionUtils.rollup(groups)
+ case _ => groups
+ }
+
+ val projectsOnAgg = replaceAggregationsAndProperties(
+ fields, table.tableEnv, aggNames, propNames)
+ val projectFields = extractFieldReferences(fields ++
groupingSets.flatten.distinct)
+
+ val logical =
+ Project(projectsOnAgg,
+ GroupingAggregation(groupingSets, aggNames.map(a => Alias(a._1,
a._2)).toSeq,
+ Project(projectFields,
table.logicalPlan).validate(table.tableEnv)
+ ).validate(table.tableEnv)
+ ).validate(table.tableEnv)
+
+ new Table(table.tableEnv, logical)
+ }
+
+ /**
+ * Performs a selection operation on a grouped table. Similar to an SQL
SELECT statement.
+ * The field expressions can contain complex expressions and
aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * tab.groupBy("key").select("key, value.avg + ' The average' as
average")
+ * }}}
+ */
+ def select(fields: String): Table = {
+ val fieldExprs = ExpressionParser.parseExpressionList(fields)
+ select(fieldExprs: _*)
+ }
+ /**
+ * Groups the records of a table by assigning them to windows defined
by a time or row interval.
+ *
+ * For streaming tables of infinite size, grouping into windows is
required to define finite
+ * groups on which group-based aggregates can be computed.
+ *
+ * For batch tables of finite size, windowing essentially provides
shortcuts for time-based
+ * groupBy.
+ *
+ * @param groupWindow group-window that specifies how elements are
grouped.
+ * @return A windowed table.
+ */
+ def window(groupWindow: GroupWindow): GroupingSetsWindowedTable = {
+ if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
+ throw new ValidationException(s"Windows on batch tables are
currently not supported.")
+ }
+ new GroupingSetsWindowedTable(table, groups, sqlKind, groupWindow)
+ }
+}
+
+class GroupingSetsWindowedTable(
--- End diff --
We decided to not support grouping sets in a stream environment yet and
there also no tests for it. Could you remove this class?
> Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
> ----------------------------------------------------
>
> Key: FLINK-2980
> URL: https://issues.apache.org/jira/browse/FLINK-2980
> Project: Flink
> Issue Type: New Feature
> Components: Documentation, Table API & SQL
> Reporter: Chengxiang Li
> Assignee: Alexander Chermenin
> Attachments: Cube-Rollup-GroupSet design doc in Flink.pdf
>
>
> Computing aggregates over a cube/rollup/grouping sets of several dimensions
> is a common operation in data warehousing. It would be nice to have them in
> Table API.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)