sjwiesman commented on a change in pull request #15642: URL: https://github.com/apache/flink/pull/15642#discussion_r616671350
########## File path: docs/content/docs/dev/table/sql/queries/deduplication.md ########## @@ -0,0 +1,76 @@ +--- +title: "Deduplication" +weight: 16 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Deduplication +{{< label Batch >}} {{< label Streaming >}} + +Deduplication removes rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once, this may result in there are duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs (e.g. `SUM`, `COUNT`). So a deduplication is needed before further analysis. + +Flink uses `ROW_NUMBER()` to remove duplicates just like the way of Top-N query. In theory, deduplication is a special case of Top-N which the N is one and order by the processing time or event time. Review comment: ```suggestion Flink uses `ROW_NUMBER()` to remove duplicates, just like the way of Top-N query. In theory, deduplication is a special case of Top-N in which the N is one and order by the processing time or event time. ``` ########## File path: docs/content/docs/dev/table/sql/queries/group-agg.md ########## @@ -0,0 +1,162 @@ +--- +title: "Group Aggregation" +weight: 8 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Group Aggregation +{{< label Batch >}} {{< label Streaming >}} + +Like most data systems, Apache Flink supports aggregate functions; both built-in and user-defined. [User-defined functions]({{< ref "docs/dev/table/functions/udfs" >}}) must be registered in a catalog before use. + +An aggregate function computes a single result from multiple input rows. For example, there are aggregates to compute the `COUNT`, `SUM`, `AVG` (average), `MAX` (maximum) and `MIN` (minimum) over a set of rows. + +```sql +SELECT COUNT(*) FROM Orders +``` + +For streaming queries, it is important to understand that Flink runs continuous queries that never terminate. Instead they update their result table according to the updates on its input tables. So for the above query, Flink will output an updated count each time a new row is inserted into the `Orders` table. Review comment: ```suggestion For streaming queries, it is important to understand that Flink runs continuous queries that never terminate. Instead, they update their result table according to the updates on its input tables. For the above query, Flink will output an updated count each time a new row is inserted into the `Orders` table. ``` ########## File path: docs/content/docs/dev/table/sql/queries/group-agg.md ########## @@ -0,0 +1,162 @@ +--- +title: "Group Aggregation" +weight: 8 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Group Aggregation +{{< label Batch >}} {{< label Streaming >}} + +Like most data systems, Apache Flink supports aggregate functions; both built-in and user-defined. [User-defined functions]({{< ref "docs/dev/table/functions/udfs" >}}) must be registered in a catalog before use. + +An aggregate function computes a single result from multiple input rows. For example, there are aggregates to compute the `COUNT`, `SUM`, `AVG` (average), `MAX` (maximum) and `MIN` (minimum) over a set of rows. + +```sql +SELECT COUNT(*) FROM Orders +``` + +For streaming queries, it is important to understand that Flink runs continuous queries that never terminate. Instead they update their result table according to the updates on its input tables. So for the above query, Flink will output an updated count each time a new row is inserted into the `Orders` table. + +Apache Flink supports the standard `GROUP BY` clause for aggregating data. + +```sql +SELECT COUNT(*) +FROM Orders +GROUP BY order_id +``` + +For streaming queries, the required state for computing the query result might grow infinitely. State size depends on the number of groups and number and type of aggregation functions. For example `MIN`/`MAX` are heavy on state size while `COUNT` is cheap. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See [query configuration]({{< ref "docs/dev/table/config" >}}#table-exec-state-ttl) for details. Review comment: ```suggestion For streaming queries, the required state for computing the query result might grow infinitely. State size depends on the number of groups and the number and type of aggregation functions. For example `MIN`/`MAX` are heavy on state size while `COUNT` is cheap. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See [query configuration]({{< ref "docs/dev/table/config" >}}#table-exec-state-ttl) for details. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-agg.md ########## @@ -0,0 +1,332 @@ +--- +title: "Window Aggregation" +weight: 7 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Window Aggregation + +## Window TVF Aggregation + +{{< label Streaming >}} Review comment: Just to confirm, this syntax is only supported in streaming mode? ########## File path: docs/content/docs/dev/table/sql/queries/window-agg.md ########## @@ -0,0 +1,332 @@ +--- +title: "Window Aggregation" +weight: 7 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Window Aggregation + +## Window TVF Aggregation + +{{< label Streaming >}} + +Window aggregations are defined in the `GROUP BY` clause contains "window_start" and "window_end" columns of the relation applied [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}). Just like queries with regular `GROUP BY` clauses, queries with a group by window aggregation will compute a single result row per group. + +```sql +SELECT ... +FROM <windowed_table> -- relation applied windowing TVF +GROUP BY window_start, window_end, ... +``` + +Unlike other aggregations on continuous tables, window aggregation do not emit intermediate results but only a final result, the total aggregation at the end of the window. Moreover, window aggregations purge all intermediate state when no longer needed. + +### Windowing TVFs + +Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations, which can be defined on either [event or processing time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}). See [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) for more windowing functions information. + +Here are some examples for `TUMBLE`, `HOP` and `CUMULATE` window aggregations. + +```sql +-- tables must have time attribute, e.g. `bidtime` in this table +Flink SQL> desc Bid; ++-------------+------------------------+------+-----+--------+---------------------------------+ +| name | type | null | key | extras | watermark | ++-------------+------------------------+------+-----+--------+---------------------------------+ +| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | +| price | DECIMAL(10, 2) | true | | | | +| item | STRING | true | | | | +| supplier_id | STRING | true | | | | ++-------------+------------------------+------+-----+--------+---------------------------------+ + +Flink SQL> SELECT * FROM Bid; ++------------------+-------+------+-------------+ +| bidtime | price | item | supplier_id | ++------------------+-------+------+-------------+ +| 2020-04-15 08:05 | 4.00 | C | supplier1 | +| 2020-04-15 08:07 | 2.00 | A | supplier1 | +| 2020-04-15 08:09 | 5.00 | D | supplier2 | +| 2020-04-15 08:11 | 3.00 | B | supplier2 | +| 2020-04-15 08:13 | 1.00 | E | supplier1 | +| 2020-04-15 08:17 | 6.00 | F | supplier2 | ++------------------+-------+------+-------------+ + +-- tumbling window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ + +-- hopping window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | ++------------------+------------------+-------+ + +-- cumulative window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | +| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 | +| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 | +| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 | +| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ +``` + +*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.* + +### GROUPING SETS + +Window aggregations also supports `GROUPING SETS` syntax. Grouping sets allow for more complex grouping operations than those describable by a standard `GROUP BY`. Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple `GROUP BY` clauses. + +Window aggregations with `GROUPING SETS` requires both the `window_start` and `window_end` columns have to be in the `GROUP BY` clause, but not in the `GROUPING SETS` clause. Review comment: ```suggestion Window aggregations with `GROUPING SETS` require both the `window_start` and `window_end` columns have to be in the `GROUP BY` clause, but not in the `GROUPING SETS` clause. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-topn.md ########## @@ -0,0 +1,110 @@ +--- +title: "Window Top-N" +weight: 15 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Window Top-N +{{< label Streaming >}} + +Window Top-N is a special [Top-N]({{< ref "docs/dev/table/sql/queries/topn" >}}) which returns the N smallest or largest values for each window and other partitioned keys. + +For streaming queries, unlike regular Top-N on continuous tables, window Top-N do not emit intermediate results but only a final result, the total top N records at the end of the window. Moreover, window Top-N purge all intermediate state when no longer needed. +Therefore, window Top-N queries have better performance if users don't need results updated per record. Usually, Window Top-N is used with [Window Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) together. + +Window Top-N can be defined in the same syntax as regular Top-N, see [Top-N documentation]({{< ref "docs/dev/table/sql/queries/topn" >}}) for more information. +Besides that, Window Top-N requires the `PARTITION BY` clause contains `window_start` and `window_end` columns of the relation applied [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) or [Window Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}). +Otherwise the optimizer won’t be able to translate the query. Review comment: ```suggestion Otherwise, the optimizer won’t be able to translate the query. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-tvf.md ########## @@ -0,0 +1,285 @@ +--- +title: "Windowing TVF" +weight: 6 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Windowing table-valued functions (Windowing TVFs) + +{{< label Streaming >}} + +Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink SQL and how the programmer can benefit to the maximum from its offered functionality. + +Apache Flink provides several windowing table-valued functions (TVF) to divide the elements of your table into windows, including: + +- [Tumble Windows](#tumble) +- [Hop Windows](#hop) +- [Cumulate Windows](#cumulate) +- Session Windows (will be supported soon) + +Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows wherein a single element can be assigned to multiple windows. + +Windowing TVFs are Flink defined Polymorphic Table Functions (abbreviated PTF). PTF is the part of the SQL 2016 standard which is a special table-function, but can have table as parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are semantically used like tables, their invocation occurs in a `FROM` clause of a `SELECT` statement. Review comment: ```suggestion Windowing TVFs are Flink defined Polymorphic Table Functions (abbreviated PTF). PTF is part of the SQL 2016 standard, a special table-function, but can have a table as a parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are used semantically like tables, their invocation occurs in a `FROM` clause of a `SELECT` statement. ``` ########## File path: docs/content/docs/dev/table/tuning.md ########## @@ -31,23 +31,27 @@ SQL is the most widely used language for data analytics. Flink's Table API and S In this page, we will introduce some useful optimization options and the internals of streaming aggregation which will bring great improvement in some cases. {{< hint info >}} -The streaming aggregations optimization are only supported for [unbounded-aggregations]({{< ref "docs/dev/table/sql/queries" >}}#aggregations). -Optimizations for [window aggregations]({{< ref "docs/dev/table/sql/queries" >}}#group-windows) will be supported in the future. +The streaming aggregation optimizations mentioned in this page are all supported for [Group Aggregations]({{< ref "docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) now. {{< /hint >}} -By default, the unbounded aggregation operator processes input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). -Besides, data skew which is very common in production will worsen the problem and make it easy for the jobs to be under backpressure situations. - ## MiniBatch Aggregation +By default, the group aggregation operators process input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). +Besides, data skew which is very common in production will worsen the problem and make it easy for the jobs to be under backpressure situations. + The core idea of mini-batch aggregation is caching a bundle of inputs in a buffer inside of the aggregation operator. When the bundle of inputs is triggered to process, only one operation per key to access state is needed. This can significantly reduce the state overhead and get a better throughput. However, this may increase some latency because it buffers some records instead of processing them in an instant. This is a trade-off between throughput and latency. The following figure explains how the mini-batch aggregation reduces state operations. {{< img src="/fig/table-streaming/minibatch_agg.png" width="50%" height="50%" >}} -MiniBatch optimization is disabled by default. In order to enable this optimization, you should set options `table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please see [configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details. +MiniBatch optimization is disabled by default for group aggregation. In order to enable this optimization, you should set options `table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please see [configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details. + +{{< hint info >}} +MiniBatch optimization is enabled by default for [Window TVF Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) and it can't be disabled. Enable or disable above mini-batch configuration doesn't affect window aggregations. Review comment: ```suggestion MiniBatch optimization is always enabled for [Window TVF Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}), regardless of the above configuration. ``` ########## File path: docs/content/docs/dev/table/sql/queries/deduplication.md ########## @@ -0,0 +1,76 @@ +--- +title: "Deduplication" +weight: 16 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Deduplication +{{< label Batch >}} {{< label Streaming >}} + +Deduplication removes rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once, this may result in there are duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs (e.g. `SUM`, `COUNT`). So a deduplication is needed before further analysis. Review comment: ```suggestion Deduplication removes rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once; this may result in duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs - e.g. `SUM`, `COUNT` - so deduplication is needed before further analysis. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-agg.md ########## @@ -0,0 +1,332 @@ +--- +title: "Window Aggregation" +weight: 7 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Window Aggregation + +## Window TVF Aggregation + +{{< label Streaming >}} + +Window aggregations are defined in the `GROUP BY` clause contains "window_start" and "window_end" columns of the relation applied [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}). Just like queries with regular `GROUP BY` clauses, queries with a group by window aggregation will compute a single result row per group. + +```sql +SELECT ... +FROM <windowed_table> -- relation applied windowing TVF +GROUP BY window_start, window_end, ... +``` + +Unlike other aggregations on continuous tables, window aggregation do not emit intermediate results but only a final result, the total aggregation at the end of the window. Moreover, window aggregations purge all intermediate state when no longer needed. + +### Windowing TVFs + +Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations, which can be defined on either [event or processing time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}). See [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) for more windowing functions information. + +Here are some examples for `TUMBLE`, `HOP` and `CUMULATE` window aggregations. + +```sql +-- tables must have time attribute, e.g. `bidtime` in this table +Flink SQL> desc Bid; ++-------------+------------------------+------+-----+--------+---------------------------------+ +| name | type | null | key | extras | watermark | ++-------------+------------------------+------+-----+--------+---------------------------------+ +| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | +| price | DECIMAL(10, 2) | true | | | | +| item | STRING | true | | | | +| supplier_id | STRING | true | | | | ++-------------+------------------------+------+-----+--------+---------------------------------+ + +Flink SQL> SELECT * FROM Bid; ++------------------+-------+------+-------------+ +| bidtime | price | item | supplier_id | ++------------------+-------+------+-------------+ +| 2020-04-15 08:05 | 4.00 | C | supplier1 | +| 2020-04-15 08:07 | 2.00 | A | supplier1 | +| 2020-04-15 08:09 | 5.00 | D | supplier2 | +| 2020-04-15 08:11 | 3.00 | B | supplier2 | +| 2020-04-15 08:13 | 1.00 | E | supplier1 | +| 2020-04-15 08:17 | 6.00 | F | supplier2 | ++------------------+-------+------+-------------+ + +-- tumbling window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ + +-- hopping window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | ++------------------+------------------+-------+ + +-- cumulative window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | +| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 | +| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 | +| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 | +| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ +``` + +*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.* + +### GROUPING SETS + +Window aggregations also supports `GROUPING SETS` syntax. Grouping sets allow for more complex grouping operations than those describable by a standard `GROUP BY`. Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple `GROUP BY` clauses. Review comment: ```suggestion Window aggregations also support `GROUPING SETS` syntax. Grouping sets allow for more complex grouping operations than those describable by a standard `GROUP BY`. Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple `GROUP BY` clauses. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-topn.md ########## @@ -0,0 +1,110 @@ +--- +title: "Window Top-N" +weight: 15 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Window Top-N +{{< label Streaming >}} + +Window Top-N is a special [Top-N]({{< ref "docs/dev/table/sql/queries/topn" >}}) which returns the N smallest or largest values for each window and other partitioned keys. + +For streaming queries, unlike regular Top-N on continuous tables, window Top-N do not emit intermediate results but only a final result, the total top N records at the end of the window. Moreover, window Top-N purge all intermediate state when no longer needed. Review comment: ```suggestion For streaming queries, unlike regular Top-N on continuous tables, window Top-N does not emit intermediate results but only a final result, the total top N records at the end of the window. Moreover, window Top-N purges all intermediate state when no longer needed. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-agg.md ########## @@ -0,0 +1,332 @@ +--- +title: "Window Aggregation" +weight: 7 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Window Aggregation + +## Window TVF Aggregation + +{{< label Streaming >}} + +Window aggregations are defined in the `GROUP BY` clause contains "window_start" and "window_end" columns of the relation applied [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}). Just like queries with regular `GROUP BY` clauses, queries with a group by window aggregation will compute a single result row per group. + +```sql +SELECT ... +FROM <windowed_table> -- relation applied windowing TVF +GROUP BY window_start, window_end, ... +``` + +Unlike other aggregations on continuous tables, window aggregation do not emit intermediate results but only a final result, the total aggregation at the end of the window. Moreover, window aggregations purge all intermediate state when no longer needed. + +### Windowing TVFs + +Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations, which can be defined on either [event or processing time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}). See [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) for more windowing functions information. + +Here are some examples for `TUMBLE`, `HOP` and `CUMULATE` window aggregations. + +```sql +-- tables must have time attribute, e.g. `bidtime` in this table +Flink SQL> desc Bid; ++-------------+------------------------+------+-----+--------+---------------------------------+ +| name | type | null | key | extras | watermark | ++-------------+------------------------+------+-----+--------+---------------------------------+ +| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | +| price | DECIMAL(10, 2) | true | | | | +| item | STRING | true | | | | +| supplier_id | STRING | true | | | | ++-------------+------------------------+------+-----+--------+---------------------------------+ + +Flink SQL> SELECT * FROM Bid; ++------------------+-------+------+-------------+ +| bidtime | price | item | supplier_id | ++------------------+-------+------+-------------+ +| 2020-04-15 08:05 | 4.00 | C | supplier1 | +| 2020-04-15 08:07 | 2.00 | A | supplier1 | +| 2020-04-15 08:09 | 5.00 | D | supplier2 | +| 2020-04-15 08:11 | 3.00 | B | supplier2 | +| 2020-04-15 08:13 | 1.00 | E | supplier1 | +| 2020-04-15 08:17 | 6.00 | F | supplier2 | ++------------------+-------+------+-------------+ + +-- tumbling window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ + +-- hopping window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | ++------------------+------------------+-------+ + +-- cumulative window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | +| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 | +| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 | +| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 | +| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ +``` + +*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.* + +### GROUPING SETS + +Window aggregations also supports `GROUPING SETS` syntax. Grouping sets allow for more complex grouping operations than those describable by a standard `GROUP BY`. Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple `GROUP BY` clauses. + +Window aggregations with `GROUPING SETS` requires both the `window_start` and `window_end` columns have to be in the `GROUP BY` clause, but not in the `GROUPING SETS` clause. + +```sql +Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ()); ++------------------+------------------+-------------+-------+ +| window_start | window_end | supplier_id | price | ++------------------+------------------+-------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 | +| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 | +| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 | ++------------------+------------------+-------------+-------+ +``` + +Each sublist of `GROUPING SETS` may specify zero or more columns or expressions and is interpreted the same way as though it was used directly in the `GROUP BY` clause. An empty grouping set means that all rows are aggregated down to a single group, which is output even if no input rows were present. Review comment: ```suggestion Each sublist of `GROUPING SETS` may specify zero or more columns or expressions and is interpreted the same way as though used directly in the `GROUP BY` clause. An empty grouping set means that all rows are aggregated down to a single group, which is output even if no input rows were present. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-agg.md ########## @@ -0,0 +1,332 @@ +--- +title: "Window Aggregation" +weight: 7 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Window Aggregation + +## Window TVF Aggregation + +{{< label Streaming >}} + +Window aggregations are defined in the `GROUP BY` clause contains "window_start" and "window_end" columns of the relation applied [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}). Just like queries with regular `GROUP BY` clauses, queries with a group by window aggregation will compute a single result row per group. + +```sql +SELECT ... +FROM <windowed_table> -- relation applied windowing TVF +GROUP BY window_start, window_end, ... +``` + +Unlike other aggregations on continuous tables, window aggregation do not emit intermediate results but only a final result, the total aggregation at the end of the window. Moreover, window aggregations purge all intermediate state when no longer needed. + +### Windowing TVFs + +Flink supports `TUMBLE`, `HOP` and `CUMULATE` types of window aggregations, which can be defined on either [event or processing time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}). See [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) for more windowing functions information. + +Here are some examples for `TUMBLE`, `HOP` and `CUMULATE` window aggregations. + +```sql +-- tables must have time attribute, e.g. `bidtime` in this table +Flink SQL> desc Bid; ++-------------+------------------------+------+-----+--------+---------------------------------+ +| name | type | null | key | extras | watermark | ++-------------+------------------------+------+-----+--------+---------------------------------+ +| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | +| price | DECIMAL(10, 2) | true | | | | +| item | STRING | true | | | | +| supplier_id | STRING | true | | | | ++-------------+------------------------+------+-----+--------+---------------------------------+ + +Flink SQL> SELECT * FROM Bid; ++------------------+-------+------+-------------+ +| bidtime | price | item | supplier_id | ++------------------+-------+------+-------------+ +| 2020-04-15 08:05 | 4.00 | C | supplier1 | +| 2020-04-15 08:07 | 2.00 | A | supplier1 | +| 2020-04-15 08:09 | 5.00 | D | supplier2 | +| 2020-04-15 08:11 | 3.00 | B | supplier2 | +| 2020-04-15 08:13 | 1.00 | E | supplier1 | +| 2020-04-15 08:17 | 6.00 | F | supplier2 | ++------------------+-------+------+-------------+ + +-- tumbling window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ + +-- hopping window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | ++------------------+------------------+-------+ + +-- cumulative window aggregation +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | +| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 | +| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 | +| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 | +| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ +``` + +*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.* + +### GROUPING SETS + +Window aggregations also supports `GROUPING SETS` syntax. Grouping sets allow for more complex grouping operations than those describable by a standard `GROUP BY`. Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple `GROUP BY` clauses. + +Window aggregations with `GROUPING SETS` requires both the `window_start` and `window_end` columns have to be in the `GROUP BY` clause, but not in the `GROUPING SETS` clause. + +```sql +Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ()); ++------------------+------------------+-------------+-------+ +| window_start | window_end | supplier_id | price | ++------------------+------------------+-------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 | +| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 | +| 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 | ++------------------+------------------+-------------+-------+ +``` + +Each sublist of `GROUPING SETS` may specify zero or more columns or expressions and is interpreted the same way as though it was used directly in the `GROUP BY` clause. An empty grouping set means that all rows are aggregated down to a single group, which is output even if no input rows were present. + +References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear. + +#### ROLLUP + +`ROLLUP` is a shorthand notation for specifying a common type of grouping set. It represents the given list of expressions and all prefixes of the list, including the empty list. + +Window aggregations with `ROLLUP` requires both the `window_start` and `window_end` columns have to be in the `GROUP BY` clause, but not in the `ROLLUP` clause. + +For example, the following query is equivalent to the one above. + +```sql +SELECT window_start, window_end, supplier_id, SUM(price) as price +FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) +GROUP BY window_start, window_end, ROLLUP (supplier_id); +``` + +#### CUBE + +`CUBE` is a shorthand notation for specifying a common type of grouping set. It represents the given list and all of its possible subsets - the power set. + +Window aggregations with `CUBE` requires both the `window_start` and `window_end` columns have to be in the `GROUP BY` clause, but not in the `CUBE` clause. + +For example, the following two queries are equivalent. + +```sql +SELECT window_start, window_end, item, supplier_id, SUM(price) as price + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end, CUBE (supplier_id, item); + +SELECT window_start, window_end, item, supplier_id, SUM(price) as price + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end, GROUPING SETS ( + (supplier_id, item), + (supplier_id ), + ( item), + ( ) +) +``` + +### Selecting Group Window Start and End Timestamps + +The start and end timestamps of group windows can be selected with the grouped `window_start` and `window_end` columns. + +### Cascading Window Aggregation + +The `window_start` and `window_end` columns are regular timestamp columns, not time attributes. Thus they can't be used as time attributes in subsequent time-based operations. +In order to propagate time attributes, you need to additionally add `window_time` column into `GROUP BY` clause. The `window_time` is the third column produced by [Windowing TVFs]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}#window-functions) which is a time attribute of the assigned window. +Adding `window_time` into `GROUP BY` clause makes `window_time` also to be group key that can be selected. Then following queries can use this column for subsequent time-based operations, such as cascading window aggregations and [Window TopN]({{< ref "docs/dev/table/sql/queries/window-topn">}}). + +The following shows a cascading window aggregation where the first window aggregation propagate time attribute for the second window aggregation. Review comment: ```suggestion The following shows a cascading window aggregation where the first window aggregation propagates the time attribute for the second window aggregation. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-tvf.md ########## @@ -0,0 +1,285 @@ +--- +title: "Windowing TVF" +weight: 6 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Windowing table-valued functions (Windowing TVFs) + +{{< label Streaming >}} + +Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink SQL and how the programmer can benefit to the maximum from its offered functionality. + +Apache Flink provides several windowing table-valued functions (TVF) to divide the elements of your table into windows, including: Review comment: ```suggestion Apache Flink provides several window table-valued functions (TVF) to divide the elements of your table into windows, including: ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-tvf.md ########## @@ -0,0 +1,285 @@ +--- +title: "Windowing TVF" +weight: 6 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Windowing table-valued functions (Windowing TVFs) + +{{< label Streaming >}} + +Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink SQL and how the programmer can benefit to the maximum from its offered functionality. + +Apache Flink provides several windowing table-valued functions (TVF) to divide the elements of your table into windows, including: + +- [Tumble Windows](#tumble) +- [Hop Windows](#hop) +- [Cumulate Windows](#cumulate) +- Session Windows (will be supported soon) + +Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows wherein a single element can be assigned to multiple windows. + +Windowing TVFs are Flink defined Polymorphic Table Functions (abbreviated PTF). PTF is the part of the SQL 2016 standard which is a special table-function, but can have table as parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are semantically used like tables, their invocation occurs in a `FROM` clause of a `SELECT` statement. + +Windowing TVFs is a replacement of legacy [Grouped Window Functions]({{< ref "docs/dev/table/sql/queries/window-agg" >}}#group-window-aggregation-deprecated). Windowing TVFs is more SQL standard compliant and more powerful to support complex window-based computations, e.g. Window TopN, Window Join. However, [Grouped Window Functions]({{< ref "docs/dev/table/sql/queries/window-agg" >}}#group-window-aggregation) can only support Window Aggregation. + +See more how to apply further computations based on windowing TVF: +- [Window Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) +- [Window TopN]({{< ref "docs/dev/table/sql/queries/window-topn">}}) +- Window Join (will be supported soon) + +## Window Functions + +Apache Flink provides 3 built-in windowing TVFs: TUMBLE, `HOP` and `CUMULATE`. The return value of windowing TVF is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The "window_time" field is a [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) of the window after windowing TVF which can be used in subsequent time-based operations, e.g. another windowing TVF, or <a href="{{< ref "docs/dev/table/sql/queries/joins" >}}#interval-joins">interval joins</a>, <a href="{{< ref "docs/dev/table/sql/queries/over-agg" >}}">over aggregations</a>. The value of `window_time` always equal to `window_end - 1ms`. + +### TUMBLE + +The `TUMBLE` function assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure. Review comment: ```suggestion The `TUMBLE` function assigns each element to a window of specified window size. Tumbling windows have a fixed size and do not overlap. For example, suppose you specify a tumbling window with a size of 5 minutes. In that case, Flink will evaluate the current window, and a new window started every five minutes, as illustrated by the following figure. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-tvf.md ########## @@ -0,0 +1,285 @@ +--- +title: "Windowing TVF" +weight: 6 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Windowing table-valued functions (Windowing TVFs) + +{{< label Streaming >}} + +Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink SQL and how the programmer can benefit to the maximum from its offered functionality. + +Apache Flink provides several windowing table-valued functions (TVF) to divide the elements of your table into windows, including: + +- [Tumble Windows](#tumble) +- [Hop Windows](#hop) +- [Cumulate Windows](#cumulate) +- Session Windows (will be supported soon) + +Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows wherein a single element can be assigned to multiple windows. + +Windowing TVFs are Flink defined Polymorphic Table Functions (abbreviated PTF). PTF is the part of the SQL 2016 standard which is a special table-function, but can have table as parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are semantically used like tables, their invocation occurs in a `FROM` clause of a `SELECT` statement. + +Windowing TVFs is a replacement of legacy [Grouped Window Functions]({{< ref "docs/dev/table/sql/queries/window-agg" >}}#group-window-aggregation-deprecated). Windowing TVFs is more SQL standard compliant and more powerful to support complex window-based computations, e.g. Window TopN, Window Join. However, [Grouped Window Functions]({{< ref "docs/dev/table/sql/queries/window-agg" >}}#group-window-aggregation) can only support Window Aggregation. + +See more how to apply further computations based on windowing TVF: +- [Window Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) +- [Window TopN]({{< ref "docs/dev/table/sql/queries/window-topn">}}) +- Window Join (will be supported soon) + +## Window Functions + +Apache Flink provides 3 built-in windowing TVFs: TUMBLE, `HOP` and `CUMULATE`. The return value of windowing TVF is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The "window_time" field is a [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) of the window after windowing TVF which can be used in subsequent time-based operations, e.g. another windowing TVF, or <a href="{{< ref "docs/dev/table/sql/queries/joins" >}}#interval-joins">interval joins</a>, <a href="{{< ref "docs/dev/table/sql/queries/over-agg" >}}">over aggregations</a>. The value of `window_time` always equal to `window_end - 1ms`. + +### TUMBLE + +The `TUMBLE` function assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure. + +{{< img src="/fig/tumbling-windows.svg" alt="Tumbling Windows" width="70%">}} + +The `TUMBLE` function assigns a window for each row of a relation based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `TUMBLE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF. + +`TUMBLE` function takes three required parameters: + +```sql +TUMBLE(TABLE data, DESCRIPTOR(timecol), size) +``` + +- `data`: is a table parameter that can be any relation with a time attribute column. +- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows. +- `size`: is a duration specifying the width of the tumbling windows. + +Here is an example invocation on the `Bid` table: + +```sql +-- tables must have time attribute, e.g. `bidtime` in this table +Flink SQL> desc Bid; ++-------------+------------------------+------+-----+--------+---------------------------------+ +| name | type | null | key | extras | watermark | ++-------------+------------------------+------+-----+--------+---------------------------------+ +| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | +| price | DECIMAL(10, 2) | true | | | | +| item | STRING | true | | | | ++-------------+------------------------+------+-----+--------+---------------------------------+ + +Flink SQL> SELECT * FROM Bid; ++------------------+-------+------+ +| bidtime | price | item | ++------------------+-------+------+ +| 2020-04-15 08:05 | 4.00 | C | +| 2020-04-15 08:07 | 2.00 | A | +| 2020-04-15 08:09 | 5.00 | D | +| 2020-04-15 08:11 | 3.00 | B | +| 2020-04-15 08:13 | 1.00 | E | +| 2020-04-15 08:17 | 6.00 | F | ++------------------+-------+------+ + +-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function, +-- window table-valued function should be used with aggregate operation, +-- this example is just used for explaining the syntax and the data produced by table-valued function. +Flink SQL> SELECT * FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); +-- or with the named params +-- note: the DATA param must be the first +Flink SQL> SELECT * FROM TABLE( + TUMBLE( + DATA => TABLE Bid, + TIMECOL => DESCRIPTOR(bidtime), + SIZE => INTERVAL '10' MINUTES)); ++------------------+-------+------+------------------+------------------+-------------------------+ +| bidtime | price | item | window_start | window_end | window_time | ++------------------+-------+------+------------------+------------------+-------------------------+ +| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | ++------------------+-------+------+------------------+------------------+-------------------------+ + +-- apply aggregation on the tumbling windowed table +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ +``` + +*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.* + + +### HOP + +The `HOP` function assigns elements to windows of fixed length. Similar to a `TUMBLE` windowing function, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a hopping window is started. Hence, hopping windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows. Hopping windows is also known as "sliding windows". Review comment: ```suggestion The `HOP` function assigns elements to windows of fixed length. Like a `TUMBLE` windowing function, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a hopping window is started. Hence, hopping windows can be overlapping if the slide is smaller than the window size. In this case, elements are assigned to multiple windows. Hopping windows are also known as "sliding windows". ``` ########## File path: docs/content/docs/dev/table/tuning.md ########## @@ -31,23 +31,27 @@ SQL is the most widely used language for data analytics. Flink's Table API and S In this page, we will introduce some useful optimization options and the internals of streaming aggregation which will bring great improvement in some cases. {{< hint info >}} -The streaming aggregations optimization are only supported for [unbounded-aggregations]({{< ref "docs/dev/table/sql/queries" >}}#aggregations). -Optimizations for [window aggregations]({{< ref "docs/dev/table/sql/queries" >}}#group-windows) will be supported in the future. +The streaming aggregation optimizations mentioned in this page are all supported for [Group Aggregations]({{< ref "docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) now. {{< /hint >}} -By default, the unbounded aggregation operator processes input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). -Besides, data skew which is very common in production will worsen the problem and make it easy for the jobs to be under backpressure situations. - ## MiniBatch Aggregation +By default, the group aggregation operators process input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). +Besides, data skew which is very common in production will worsen the problem and make it easy for the jobs to be under backpressure situations. + The core idea of mini-batch aggregation is caching a bundle of inputs in a buffer inside of the aggregation operator. When the bundle of inputs is triggered to process, only one operation per key to access state is needed. This can significantly reduce the state overhead and get a better throughput. However, this may increase some latency because it buffers some records instead of processing them in an instant. This is a trade-off between throughput and latency. The following figure explains how the mini-batch aggregation reduces state operations. {{< img src="/fig/table-streaming/minibatch_agg.png" width="50%" height="50%" >}} -MiniBatch optimization is disabled by default. In order to enable this optimization, you should set options `table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please see [configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details. +MiniBatch optimization is disabled by default for group aggregation. In order to enable this optimization, you should set options `table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please see [configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details. + +{{< hint info >}} +MiniBatch optimization is enabled by default for [Window TVF Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) and it can't be disabled. Enable or disable above mini-batch configuration doesn't affect window aggregations. +Besides, window aggregations buffer records in [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm">}}#managed-memory) instead of JVM Heap, so there is no risk of full GC or OOM problems. Review comment: ```suggestion Window TVF aggregation buffer records in [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm">}}#managed-memory) instead of JVM Heap, so there is no risk of overloading GC or OOM issues. ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-tvf.md ########## @@ -0,0 +1,285 @@ +--- +title: "Windowing TVF" +weight: 6 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Windowing table-valued functions (Windowing TVFs) + +{{< label Streaming >}} + +Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink SQL and how the programmer can benefit to the maximum from its offered functionality. + +Apache Flink provides several windowing table-valued functions (TVF) to divide the elements of your table into windows, including: + +- [Tumble Windows](#tumble) +- [Hop Windows](#hop) +- [Cumulate Windows](#cumulate) +- Session Windows (will be supported soon) + +Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows wherein a single element can be assigned to multiple windows. + +Windowing TVFs are Flink defined Polymorphic Table Functions (abbreviated PTF). PTF is the part of the SQL 2016 standard which is a special table-function, but can have table as parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are semantically used like tables, their invocation occurs in a `FROM` clause of a `SELECT` statement. + +Windowing TVFs is a replacement of legacy [Grouped Window Functions]({{< ref "docs/dev/table/sql/queries/window-agg" >}}#group-window-aggregation-deprecated). Windowing TVFs is more SQL standard compliant and more powerful to support complex window-based computations, e.g. Window TopN, Window Join. However, [Grouped Window Functions]({{< ref "docs/dev/table/sql/queries/window-agg" >}}#group-window-aggregation) can only support Window Aggregation. + +See more how to apply further computations based on windowing TVF: +- [Window Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) +- [Window TopN]({{< ref "docs/dev/table/sql/queries/window-topn">}}) +- Window Join (will be supported soon) + +## Window Functions + +Apache Flink provides 3 built-in windowing TVFs: TUMBLE, `HOP` and `CUMULATE`. The return value of windowing TVF is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The "window_time" field is a [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) of the window after windowing TVF which can be used in subsequent time-based operations, e.g. another windowing TVF, or <a href="{{< ref "docs/dev/table/sql/queries/joins" >}}#interval-joins">interval joins</a>, <a href="{{< ref "docs/dev/table/sql/queries/over-agg" >}}">over aggregations</a>. The value of `window_time` always equal to `window_end - 1ms`. + +### TUMBLE + +The `TUMBLE` function assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure. + +{{< img src="/fig/tumbling-windows.svg" alt="Tumbling Windows" width="70%">}} + +The `TUMBLE` function assigns a window for each row of a relation based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `TUMBLE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF. + +`TUMBLE` function takes three required parameters: + +```sql +TUMBLE(TABLE data, DESCRIPTOR(timecol), size) +``` + +- `data`: is a table parameter that can be any relation with a time attribute column. +- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows. +- `size`: is a duration specifying the width of the tumbling windows. + +Here is an example invocation on the `Bid` table: + +```sql +-- tables must have time attribute, e.g. `bidtime` in this table +Flink SQL> desc Bid; ++-------------+------------------------+------+-----+--------+---------------------------------+ +| name | type | null | key | extras | watermark | ++-------------+------------------------+------+-----+--------+---------------------------------+ +| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | +| price | DECIMAL(10, 2) | true | | | | +| item | STRING | true | | | | ++-------------+------------------------+------+-----+--------+---------------------------------+ + +Flink SQL> SELECT * FROM Bid; ++------------------+-------+------+ +| bidtime | price | item | ++------------------+-------+------+ +| 2020-04-15 08:05 | 4.00 | C | +| 2020-04-15 08:07 | 2.00 | A | +| 2020-04-15 08:09 | 5.00 | D | +| 2020-04-15 08:11 | 3.00 | B | +| 2020-04-15 08:13 | 1.00 | E | +| 2020-04-15 08:17 | 6.00 | F | ++------------------+-------+------+ + +-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function, +-- window table-valued function should be used with aggregate operation, +-- this example is just used for explaining the syntax and the data produced by table-valued function. +Flink SQL> SELECT * FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); +-- or with the named params +-- note: the DATA param must be the first +Flink SQL> SELECT * FROM TABLE( + TUMBLE( + DATA => TABLE Bid, + TIMECOL => DESCRIPTOR(bidtime), + SIZE => INTERVAL '10' MINUTES)); ++------------------+-------+------+------------------+------------------+-------------------------+ +| bidtime | price | item | window_start | window_end | window_time | ++------------------+-------+------+------------------+------------------+-------------------------+ +| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | ++------------------+-------+------+------------------+------------------+-------------------------+ + +-- apply aggregation on the tumbling windowed table +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ +``` + +*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.* + + +### HOP + +The `HOP` function assigns elements to windows of fixed length. Similar to a `TUMBLE` windowing function, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a hopping window is started. Hence, hopping windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows. Hopping windows is also known as "sliding windows". + +For example, you could have windows of size 10 minutes that slides by 5 minutes. With this you get every 5 minutes a window that contains the events that arrived during the last 10 minutes as depicted by the following figure. Review comment: ```suggestion For example, you could have windows of size 10 minutes that slides by 5 minutes. With this, you get every 5 minutes a window that contains the events that arrived during the last 10 minutes, as depicted by the following figure. ``` ########## File path: docs/content/docs/dev/table/tuning.md ########## @@ -31,23 +31,27 @@ SQL is the most widely used language for data analytics. Flink's Table API and S In this page, we will introduce some useful optimization options and the internals of streaming aggregation which will bring great improvement in some cases. {{< hint info >}} -The streaming aggregations optimization are only supported for [unbounded-aggregations]({{< ref "docs/dev/table/sql/queries" >}}#aggregations). -Optimizations for [window aggregations]({{< ref "docs/dev/table/sql/queries" >}}#group-windows) will be supported in the future. +The streaming aggregation optimizations mentioned in this page are all supported for [Group Aggregations]({{< ref "docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) now. {{< /hint >}} -By default, the unbounded aggregation operator processes input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). -Besides, data skew which is very common in production will worsen the problem and make it easy for the jobs to be under backpressure situations. - ## MiniBatch Aggregation +By default, the group aggregation operators process input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). +Besides, data skew which is very common in production will worsen the problem and make it easy for the jobs to be under backpressure situations. Review comment: ```suggestion Besides, data skew, which is very common in production, will worsen the problem and make it easy for the jobs to be under backpressure situations. ``` ########## File path: docs/content/docs/dev/table/tuning.md ########## @@ -31,23 +31,27 @@ SQL is the most widely used language for data analytics. Flink's Table API and S In this page, we will introduce some useful optimization options and the internals of streaming aggregation which will bring great improvement in some cases. {{< hint info >}} -The streaming aggregations optimization are only supported for [unbounded-aggregations]({{< ref "docs/dev/table/sql/queries" >}}#aggregations). -Optimizations for [window aggregations]({{< ref "docs/dev/table/sql/queries" >}}#group-windows) will be supported in the future. +The streaming aggregation optimizations mentioned in this page are all supported for [Group Aggregations]({{< ref "docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) now. {{< /hint >}} -By default, the unbounded aggregation operator processes input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). -Besides, data skew which is very common in production will worsen the problem and make it easy for the jobs to be under backpressure situations. - ## MiniBatch Aggregation +By default, the group aggregation operators process input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). Review comment: ```suggestion By default, group aggregation operators process input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to the accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). ``` ########## File path: docs/content/docs/dev/table/sql/queries/window-tvf.md ########## @@ -0,0 +1,285 @@ +--- +title: "Windowing TVF" +weight: 6 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Windowing table-valued functions (Windowing TVFs) + +{{< label Streaming >}} + +Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink SQL and how the programmer can benefit to the maximum from its offered functionality. + +Apache Flink provides several windowing table-valued functions (TVF) to divide the elements of your table into windows, including: + +- [Tumble Windows](#tumble) +- [Hop Windows](#hop) +- [Cumulate Windows](#cumulate) +- Session Windows (will be supported soon) + +Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows wherein a single element can be assigned to multiple windows. + +Windowing TVFs are Flink defined Polymorphic Table Functions (abbreviated PTF). PTF is the part of the SQL 2016 standard which is a special table-function, but can have table as parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are semantically used like tables, their invocation occurs in a `FROM` clause of a `SELECT` statement. + +Windowing TVFs is a replacement of legacy [Grouped Window Functions]({{< ref "docs/dev/table/sql/queries/window-agg" >}}#group-window-aggregation-deprecated). Windowing TVFs is more SQL standard compliant and more powerful to support complex window-based computations, e.g. Window TopN, Window Join. However, [Grouped Window Functions]({{< ref "docs/dev/table/sql/queries/window-agg" >}}#group-window-aggregation) can only support Window Aggregation. + +See more how to apply further computations based on windowing TVF: +- [Window Aggregation]({{< ref "docs/dev/table/sql/queries/window-agg" >}}) +- [Window TopN]({{< ref "docs/dev/table/sql/queries/window-topn">}}) +- Window Join (will be supported soon) + +## Window Functions + +Apache Flink provides 3 built-in windowing TVFs: TUMBLE, `HOP` and `CUMULATE`. The return value of windowing TVF is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The "window_time" field is a [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) of the window after windowing TVF which can be used in subsequent time-based operations, e.g. another windowing TVF, or <a href="{{< ref "docs/dev/table/sql/queries/joins" >}}#interval-joins">interval joins</a>, <a href="{{< ref "docs/dev/table/sql/queries/over-agg" >}}">over aggregations</a>. The value of `window_time` always equal to `window_end - 1ms`. + +### TUMBLE + +The `TUMBLE` function assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure. + +{{< img src="/fig/tumbling-windows.svg" alt="Tumbling Windows" width="70%">}} + +The `TUMBLE` function assigns a window for each row of a relation based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `TUMBLE` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after window TVF. + +`TUMBLE` function takes three required parameters: + +```sql +TUMBLE(TABLE data, DESCRIPTOR(timecol), size) +``` + +- `data`: is a table parameter that can be any relation with a time attribute column. +- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to tumbling windows. +- `size`: is a duration specifying the width of the tumbling windows. + +Here is an example invocation on the `Bid` table: + +```sql +-- tables must have time attribute, e.g. `bidtime` in this table +Flink SQL> desc Bid; ++-------------+------------------------+------+-----+--------+---------------------------------+ +| name | type | null | key | extras | watermark | ++-------------+------------------------+------+-----+--------+---------------------------------+ +| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | +| price | DECIMAL(10, 2) | true | | | | +| item | STRING | true | | | | ++-------------+------------------------+------+-----+--------+---------------------------------+ + +Flink SQL> SELECT * FROM Bid; ++------------------+-------+------+ +| bidtime | price | item | ++------------------+-------+------+ +| 2020-04-15 08:05 | 4.00 | C | +| 2020-04-15 08:07 | 2.00 | A | +| 2020-04-15 08:09 | 5.00 | D | +| 2020-04-15 08:11 | 3.00 | B | +| 2020-04-15 08:13 | 1.00 | E | +| 2020-04-15 08:17 | 6.00 | F | ++------------------+-------+------+ + +-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function, +-- window table-valued function should be used with aggregate operation, +-- this example is just used for explaining the syntax and the data produced by table-valued function. +Flink SQL> SELECT * FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)); +-- or with the named params +-- note: the DATA param must be the first +Flink SQL> SELECT * FROM TABLE( + TUMBLE( + DATA => TABLE Bid, + TIMECOL => DESCRIPTOR(bidtime), + SIZE => INTERVAL '10' MINUTES)); ++------------------+-------+------+------------------+------------------+-------------------------+ +| bidtime | price | item | window_start | window_end | window_time | ++------------------+-------+------+------------------+------------------+-------------------------+ +| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | ++------------------+-------+------+------------------+------------------+-------------------------+ + +-- apply aggregation on the tumbling windowed table +Flink SQL> SELECT window_start, window_end, SUM(price) + FROM TABLE( + TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++------------------+------------------+-------+ +``` + +*Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. `2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink SQL Client if the type is `TIMESTAMP(3)`.* + + +### HOP + +The `HOP` function assigns elements to windows of fixed length. Similar to a `TUMBLE` windowing function, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a hopping window is started. Hence, hopping windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows. Hopping windows is also known as "sliding windows". + +For example, you could have windows of size 10 minutes that slides by 5 minutes. With this you get every 5 minutes a window that contains the events that arrived during the last 10 minutes as depicted by the following figure. + +{{< img src="/fig/sliding-windows.svg" alt="Hopping windows" width="70%">}} + +The `HOP` function assigns windows that cover rows within the interval of size and shifting every slide based on a [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column. The return value of `HOP` is a new relation that includes all columns of original relation as well as additional 3 columns named "window_start", "window_end", "window_time" to indicate the assigned window. The original time attribute "timecol" will be a regular timestamp column after windowing TVF. + +`HOP` takes three required parameters. + +```sql +HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ]) +``` + +- `data`: is a table parameter that can be any relation with an time attribute column. +- `timecol`: is a column descriptor indicating which [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}) column of data should be mapped to hopping windows. +- `slide`: is a duration specifying the duration between the start of sequential hopping windows +- `size`: is a duration specifying the width of the hopping windows. + +Here is an example invocation on the `Bid` table: + +```sql +-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function, +-- window table-valued function should be used with aggregate operation, +-- this example is just used for explaining the syntax and the data produced by table-valued function. +> SELECT * FROM TABLE( + HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)); +-- or with the named params +-- note: the DATA param must be the first +> SELECT * FROM TABLE( + HOP( + DATA => TABLE Bid, + TIMECOL => DESCRIPTOR(bidtime), + SLIDE => INTERVAL '5' MINUTES, + SIZE => INTERVAL '10' MINUTES)); ++------------------+-------+------+------------------+------------------+-------------------------+ +| bidtime | price | item | window_start | window_end | window_time | ++------------------+-------+------+------------------+------------------+-------------------------+ +| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 | +| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 | +| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | +| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 | +| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 | +| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 | +| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 | ++------------------+-------+------+------------------+------------------+-------------------------+ + +-- apply aggregation on the hopping windowed table +> SELECT window_start, window_end, SUM(price) + FROM TABLE( + HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) + GROUP BY window_start, window_end; ++------------------+------------------+-------+ +| window_start | window_end | price | ++------------------+------------------+-------+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | ++------------------+------------------+-------+ +``` + +### CUMULATE + +Cumulating windows are very useful in some scenarios, such as tumbling windows with early firing in a fixed window interval. For example, a daily dashboard draws cumulative UVs from 00:00 to every minute, the UV at 10:00 represents the total number of UV from 00:00 to 10:00. This can be easily and efficiently implemented by CUMULATE windowing. + +The `CUMULATE` function assigns elements to windows that cover rows within an initial interval of step size, and expanding to one more step size (keep window start fixed) every step until to the max window size. Review comment: ```suggestion The `CUMULATE` function assigns elements to windows that cover rows within an initial interval of step size and expand to one more step size (keep window start fixed) every step until the max window size. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
