[
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089101#comment-16089101
]
Fabian Hueske commented on FLINK-6075:
--------------------------------------
Hi [~rtudoran],
I had a look at [PR #4263|https://github.com/apache/flink/pull/4263] and think
we need to rework the semantics of the operators a bit.
The result of a query on a stream must be the same as if the query is executed
on the same data in a regular database.
For example, given a query {{SELECT * FROM t ORDER BY rowtime ASC OFFSET 10}},
we must emit all rows except for the rows with the 10 smallest timestamps
(which should be the first). {{SELECT * FROM t ORDER BY rowtime ASC FETCH FIRST
10 ROWS ONLY}} would return the rows with the 10 smallest timestamps.
With these semantics in mind, {{ORDER BY}} with {{OFFSET}} and {{FETCH}} can be
implemented as follows:
- {{ORDER BY time ASC}}:
- ORDER BY: already implemented. No retraction needed.
- ORDER BY OFFSET x: same operator as ORDER BY but drop the first x rows.
Only counter state and no retraction needed.
- ORDER BY FETCH x: same operator as ORDER BY but drop all rows after the
first x. Only counter state and no retraction needed.
- ORDER BY OFFSET x FETCH y: same operator as ORDER BY but drop the first x
rows and stop after y rows have been emitted. Only counter state and no
retraction needed.
We can also support {{ORDER BY}} with {{OFFSET}} and {{FETCH}} for arbitrary
sort orders if we do not care about the emission order and only about the
correct filtering.
- ORDER BY any attribute: Correct order cannot be provided, but we can return
all valid rows.
- ORDER BY: correct order cannot be provided, hence not possible.
- ORDER BY OFFSET x: We can emit all valid rows if we keep x rows as state.
No retraction needed.
- ORDER BY FETCH y: We can emit all valid rows if we keep y rows as state.
Retraction needed.
- ORDER BY OFFSET x FETCH y: We can emit all valid rows if we keep x + y rows
as state. Retraction needed.
Given these semantics, I propose to split the development of ORDER BY with
OFFSET and FETCH into multiple subissues / PRs:
- Subissue / PR 1: ORDER BY OFFSET, ORDER BY FETCH, and ORDER BY OFFSET FETCH
for ascending time order. This can be implemented by extending the current
ORDER BY implementation with two counters.
- Subissue / PR 2: ORDER BY OFFSET, ORDER BY FETCH, and ORDER BY OFFSET FETCH
for descending time order. These can be implemented in two operators, one for
proc-time and one for row-time.
- Subissue / PR 3: ORDER BY OFFSET, ORDER BY FETCH, and ORDER BY OFFSET FETCH
for arbitrary sort orders. These can be implemented in two operators, one for
proc-time and one for row-time.
All implementations only support append-only input tables. We can later add
support for inputs that are updated.
Best, Fabian
> Support Limit/Top(Sort) for Stream SQL
> --------------------------------------
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: radu
> Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the
> same only the processing function differs in terms of the output. Hence, the
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL
> '3' HOUR) ORDER BY b`
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING
> LIMIT 10) FROM stream1`
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections
> of data).
> -Each of the 3 operators will be supported with each of the types of
> expressing the windows.
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all
> require a sorted collection of the data on which the logic will be applied
> (i.e., select a subset of the items or the entire sorted set). These
> functions would make sense in the streaming context only in the context of a
> window. Without defining a window the functions could never emit as the sort
> operation would never trigger. If an SQL query will be provided without
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR).
> Although not targeted by this JIRA, in the case of working based on event
> time order, the retraction mechanisms of windows and the lateness mechanisms
> can be used to deal with out of order events and retraction/updates of
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting,
> limit and top). Rowtime indicates when the HOP window will trigger – which
> can be observed in the fact that outputs are generated only at those moments.
> The HOP windows will trigger at every hour (fixed hour) and each event will
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime
> indicates the processing time when a new event arrives in the system. Events
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort
> [ASC]||
> | |10:00:00 |(aaa, 11) | | |
> |
> | |10:05:00 |(aab, 7) | | | |
> |10-11 |11:00:00 | | aab,aaa |aab,aaa | aab,aaa
> |
> | |11:03:00 |(aac,21) | | | |
>
> |11-12 |12:00:00 | | aab,aaa |aab,aaa | aab,aaa,aac|
> | |12:10:00 |(abb,12) | | | |
>
> | |12:15:00 |(abb,12) | | | |
>
> |12-13 |13:00:00 | | abb,abb | abb,abb |
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries,
> the functionality will be implemented within the logic of the window as
> follows.
> * Window assigner – selected based on the type of window used in SQL
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the
> window boundaries
> * Apply – window function that sorts data and selects the output to trigger
> (based on LIMIT/TOP parameters). All data will be sorted at once and result
> outputted when the window is triggered
> An alternative implementation can be to use a fold window function to sort
> the elements as they arrive, one at a time followed by a flatMap to filter
> the number of outputs.
> !sort.png!
> **General logic of Join**
> ```
> inputDataStream.window(new [Slide/Tumble][Time/Count]Window())
> //.trigger(new [Time/Count]Trigger()) – use default
> //.evictor(new [Time/Count]Evictor()) – use default
> .apply(SortAndFilter());
> ```
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)