leonardBang commented on a change in pull request #9511: 
[FLINK-13356][table][docs] Add documentation for TopN and Deduplication in 
blink planner
URL: https://github.com/apache/flink/pull/9511#discussion_r318582510
 
 

 ##########
 File path: docs/dev/table/sql.md
 ##########
 @@ -813,6 +813,222 @@ LIMIT 3
 
 {% top %}
 
+### Top-N
+
+TopN is used to calculate the maximum/minimum N records in a stream. It can be 
flexibly completed based on OVER window aggregation. The grammar is shown as 
below:
+
+Top-N queries ask for the N smallest or largest values ordered by columns. 
Both smallest and largest values sets are considered Top-N queries. Top-N 
queries are useful in cases where the need is to display only the N bottom-most 
or the N top-
+most records from batch/stream table on a condition. This result set can be 
used for further analysis.
+
+Flink uses the combination of a OVER window clause and a filter condition to 
express a Top-N query. With the power of OVER window `PARTITION BY` clause, 
Flink also supports per group Top-N. For example, the top five products per 
category that have the maximum sales in realtime. Top-N queries are supported 
for SQL on batch and streaming tables.
+
+The following shows the syntax of the TOP-N statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
+   FROM table_name)
+WHERE rownum <= N [AND conditions]
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting 
with one, according to the ordering of rows within the partition. Currently, we 
only support `ROW_NUMBER` as the over window function. In the future, we will 
support `RANK()` and `DENSE_RANK()`.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns. Each 
partition will have a Top-N result.
+- `ORDER BY col1 [asc|desc][, col2 [asc|desc]...]`: Specifies the ordering 
columns. The ordering directions can be different on different columns.
+- `WHERE rownum <= N`: The `rownum <= N` is required for Flink to recognize 
this query is a Top-N query. The N represents the N smallest or largest records 
will be retained.
+- `[AND conditions]`: It is free to add other conditions in the where clause, 
but the other conditions can only be combined with `rownum <= N` using `AND` 
conjunction.
+
+<span class="label label-danger">Attention</span> Flink SQL will sort the 
input data stream according to the order key, so if the top N records have been 
changed, the changed ones will be sent as retraction/update records to 
downstream. In addition, if the top N records need to be stored in external 
storage, the result table must have the same primary key with the Top-N query. 
By default, the primary key of Top-N query is the combination of partition 
columns + rownum column.
+
+The following examples show how to specify SQL queries with Top-N on streaming 
tables. This is an example to get "the top five products per category that have 
the maximum sales in realtime" we mentioned above.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, 
product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT * " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 
'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT *
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+#### No Ranking Output Optimization
+
+As described above, the `rownum` field will be written into the result table 
as one field of the primary key, which may lead to a lot of records being 
written to the result table. For example, when the record (say `product-1001`) 
of ranking 9 is updated and its rank is upgraded to 1, all the records from 
ranking 1 ~ 9 will be output to the result table as update messages. If the 
result table receives too many data, it will become the bottleneck of the SQL 
job.
+
+The optimization way is omitting rownum field in the outer SELECT clause of 
the Top-N query. This is reasonable because the number of the top N records is 
usually not large, thus the consumers can sort the records themselves quickly. 
Without rownum field, in the example above, only the changed record 
(`product-1001`) needs to be sent to downstream, which can reduce much IO to 
the result table.
+
+The following example shows how to optimize the above Top-N example in this 
way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, 
product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT product_id, category, product_name, sales " + // omit row_num field 
in the output
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 
'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT product_id, category, product_name, sales  -- omit row_num field 
in the output
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> In order to output the above 
query to an external storage and have a correct result, the external storage 
must have the same primary key with the Top-N query. In the above example 
query, if the `product_id` is the primary key of the query, then the external 
table should also has `product_id` as the primary key.
 
 Review comment:
   > It can be explained that the PK changes caused by optimization and 
non-optimization are difficult to understand.
   
   +1, the top-N implements with optimization and non-optimization are 
difficult to understand according to the description.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to