leonardBang commented on a change in pull request #9511:
[FLINK-13356][table][docs] Add documentation for TopN and Deduplication in
blink planner
URL: https://github.com/apache/flink/pull/9511#discussion_r318582510
##########
File path: docs/dev/table/sql.md
##########
@@ -813,6 +813,222 @@ LIMIT 3
{% top %}
+### Top-N
+
+TopN is used to calculate the maximum/minimum N records in a stream. It can be
flexibly completed based on OVER window aggregation. The grammar is shown as
below:
+
+Top-N queries ask for the N smallest or largest values ordered by columns.
Both smallest and largest values sets are considered Top-N queries. Top-N
queries are useful in cases where the need is to display only the N bottom-most
or the N top-
+most records from batch/stream table on a condition. This result set can be
used for further analysis.
+
+Flink uses the combination of a OVER window clause and a filter condition to
express a Top-N query. With the power of OVER window `PARTITION BY` clause,
Flink also supports per group Top-N. For example, the top five products per
category that have the maximum sales in realtime. Top-N queries are supported
for SQL on batch and streaming tables.
+
+The following shows the syntax of the TOP-N statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+ SELECT [column_list],
+ ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+ ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
+ FROM table_name)
+WHERE rownum <= N [AND conditions]
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting
with one, according to the ordering of rows within the partition. Currently, we
only support `ROW_NUMBER` as the over window function. In the future, we will
support `RANK()` and `DENSE_RANK()`.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns. Each
partition will have a Top-N result.
+- `ORDER BY col1 [asc|desc][, col2 [asc|desc]...]`: Specifies the ordering
columns. The ordering directions can be different on different columns.
+- `WHERE rownum <= N`: The `rownum <= N` is required for Flink to recognize
this query is a Top-N query. The N represents the N smallest or largest records
will be retained.
+- `[AND conditions]`: It is free to add other conditions in the where clause,
but the other conditions can only be combined with `rownum <= N` using `AND`
conjunction.
+
+<span class="label label-danger">Attention</span> Flink SQL will sort the
input data stream according to the order key, so if the top N records have been
changed, the changed ones will be sent as retraction/update records to
downstream. In addition, if the top N records need to be stored in external
storage, the result table must have the same primary key with the Top-N query.
By default, the primary key of Top-N query is the combination of partition
columns + rownum column.
+
+The following examples show how to specify SQL queries with Top-N on streaming
tables. This is an example to get "the top five products per category that have
the maximum sales in realtime" we mentioned above.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category,
product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+ "SELECT * " +
+ "FROM (" +
+ " SELECT *," +
+ " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as
row_num" +
+ " FROM ShopSales)" +
+ "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category,
'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+ """
+ |SELECT *
+ |FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as
row_num
+ | FROM ShopSales)
+ |WHERE row_num <= 5
+ """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+#### No Ranking Output Optimization
+
+As described above, the `rownum` field will be written into the result table
as one field of the primary key, which may lead to a lot of records being
written to the result table. For example, when the record (say `product-1001`)
of ranking 9 is updated and its rank is upgraded to 1, all the records from
ranking 1 ~ 9 will be output to the result table as update messages. If the
result table receives too many data, it will become the bottleneck of the SQL
job.
+
+The optimization way is omitting rownum field in the outer SELECT clause of
the Top-N query. This is reasonable because the number of the top N records is
usually not large, thus the consumers can sort the records themselves quickly.
Without rownum field, in the example above, only the changed record
(`product-1001`) needs to be sent to downstream, which can reduce much IO to
the result table.
+
+The following example shows how to optimize the above Top-N example in this
way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category,
product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+ "SELECT product_id, category, product_name, sales " + // omit row_num field
in the output
+ "FROM (" +
+ " SELECT *," +
+ " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as
row_num" +
+ " FROM ShopSales)" +
+ "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category,
'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+ """
+ |SELECT product_id, category, product_name, sales -- omit row_num field
in the output
+ |FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as
row_num
+ | FROM ShopSales)
+ |WHERE row_num <= 5
+ """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> In order to output the above
query to an external storage and have a correct result, the external storage
must have the same primary key with the Top-N query. In the above example
query, if the `product_id` is the primary key of the query, then the external
table should also has `product_id` as the primary key.
Review comment:
> It can be explained that the PK changes caused by optimization and
non-optimization are difficult to understand.
+1, the top-N implements with optimization and non-optimization are
difficult to understand according to the description.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services