radu created FLINK-6081:
---------------------------

             Summary: Offset/Fetch support for SQL Streaming
                 Key: FLINK-6081
                 URL: https://issues.apache.org/jira/browse/FLINK-6081
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: radu


Time target: Proc Time

The main scope of Offset/Fetch is for pagination support. In the context
of streaming Offset and Fetch would make sense within the scope of
certain window constructs as they refer to buffered data from the stream
(with a main usage to restrict the output that is shown at a certain
moment). Therefore they should be applied to the output of the types of
windows supported by the ORDER BY clauses. Moreover, in accordance to
the SQL best practices, they can only be used with an ORDER BY clause.

SQL targeted query examples:
----------------------------

Window defined based on group by clause

```Q1: SELECT a ORDER BY b OFFSET n ROWS FROM stream1 GROUP BY HOP(proctime, 
INTERVAL '1' HOUR, INTERVAL '3' HOUR) ```

Window defined based on where clause time boundaries

```Q2: SELECT a ORDER BY b OFFSET n WHERE procTime() BETWEEN current\_timestamp 
- INTERVAL '1' HOUR AND current\_timestamp FROM stream1 ```


~~Window defined as sliding windows (aggregates) ~~

``` Q3: ~~SELECT SUM(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR 
PRECEDING b OFFSET n ROWS) FROM stream1~~ ```

Comment: Supporting offset over sliding windows (within the window) does
not make sense because the main scope of OFFSET/FETCH is for pagination
support. Therefore this functionality example should only be supported in 
relation to the
output of a query. Hence, Q3 will not be supported

The general grammar (Calcite version) for OFFSET/FECTH with available
parameters is shown below:

```
Select […]

[ ORDER BY orderItem [, orderItem ]* ]

[ OFFSET start { ROW | ROWS } ]

[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ]
```

Description
-----------

Offset and Fetch are primary used for pagination support (i.e., restrict
the output that is shown at some point). They were mainly designed to
support web page display of the contents. Building on this scenario we
can imagine a similar role for OFFSET and FETCH for streams that would
display contents via a web page. In such a scenario the number of
outputs to be displayed would be limited using such operators (probably
for pagination and aesthetic reasons). However, as for any stream
application there is a natural evolution in time, the operators output
should evolve with the update rate of the application. The fact that
there is an update rate and a collection of events related to a stream
points to window constructs. Therefore the OFFSET/FETCH functionality
would be related to the window mechanisms/boundaries defined by the
query. Hence when the window construct would be re-triggered the output
would be filtered again from the cardinality point of view based on the
logic of the OFFSET/FETCH.

Because of the primary reasons of supporting pagination (and controlling
the number of outputs) we limit the usage of OFFSET/Fetch for window
constructs that would be related to the output. Because of this
supporting those on sliding window with query aggregates (e.g., Q3 query
example) would not make sense. Additionally there is an implicit need
for some ordering clause due to the fact that OFFSET and FETCH point to
ordering positions. That is why these functions would be supported only
if an ORDER BY clause is present.

Functionality example
---------------------

We exemplify the usage of OFFSET below using the following query. Event
schema is in the form (a,b).

``` SELECT a ORDER BY b OFFSET 2 ROWS FROM stream1 GROUP BY GROUP BY 
CEIL(proctime TO HOUR) ```


||Proctime||    IngestionTime(Event)||  Stream1||       Output||
| |10:00:01|    (a1, 7)| |      
| |10:05:00|    (c1, 2)| |      
| |10:12:00|    (b1,5)| |       
| |10:50:00|    (d1,2)| |       
|10-11|         |       |b1,a1|
| |11:03:00|    (a2,10)|        | |
|11-12|         |       |nil|
|...|


Implementation option
---------------------

There are 2 options to implement the logic of OFFSET/Fetch:

1)  Within the logic of the window (i.e. sorting window)

Similar as for sorting support (ORDER BY clause), considering that the
SQL operators will be associated with window boundaries, the
functionality will be implemented within the logic of the window as
follows. We extract the window boundaries and window type from the query
logic. These will be used to define the type of the window, triggering
policy. The logic of the query (i.e., the sorting of the events) will in
turn be implemented within the window function. In addition to this, the
logic of for filtering the output based on the cardinality logic of
OFFSET/FETCH will be added. With this implementation the logic of the
OFFSET and FETCH is combined with the one of ORDER BY clause. As ORDER
BY is always required, it does not provide any implementation
restrictions.

1)  Within the logic of a filter/flatMap function with state counter for
    outputs)

Instead of adding the logic within the window functions, the filtering
can be done within a standalone operator that only counts outputs and
emits the ones that fall within the logic of the OFFSET/FETCH. To
provide this functionality we need to use a flatMap function in which we
count the results. The OFFSET/FETCH condition would be transpose into
the condition of an IF, applied based on the order of the output, to
emit the output. However, the counter would need to be reset in
accordance to the triggering of the window, which makes the
implementation tedious. This is despite the fact that this
implementation option would directly translate the output filtering
logic of the operators from relational SQL.

We recommend option 1 for implementation.

Therefore for option 1 we reuse entirely the ORDER BY implementation and
just add:

1)  A counter for the indexing the outputs

2)  An if condition to emit the output only if the corresponding index
    counter falls within the scope defined by the OFFSET/FETCH

General logic of Join
---------------------

inputDataStream.window(new \[Slide/Tumble\]\[Time/Count\]Window())

> //.trigger(new \[Time/Count\]Trigger()) – use default
>
> //.evictor(new \[Time/Count\]Evictor()) – use default

.apply(SortAndCountFilter());




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to