[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010872#comment-16010872
 ] 

ASF GitHub Bot commented on FLINK-6075:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3889#discussion_r116538474
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
 ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import 
org.apache.flink.table.api.scala.stream.sql.SortITCase.{EventTimeSourceFunction,StringRowSelectorSink}
    +import org.apache.flink.streaming.api.functions.source.SourceFunction
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import scala.collection.mutable
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    +
    +class SortITCase extends StreamingWithStateTestBase {
    +
    +  @Test
    +  def testEventTimeOrderBy(): Unit = {
    +    val data = Seq(
    +      Left((1500L, (1L, 15, "Hello"))),
    +      Left((1600L, (1L, 16, "Hello"))),
    +      Left((1000L, (1L, 1, "Hello"))),
    +      Left((2000L, (2L, 2, "Hello"))),
    +      Right(1000L),
    +      Left((2000L, (2L, 2, "Hello"))),
    +      Left((2000L, (2L, 3, "Hello"))),
    +      Left((3000L, (3L, 3, "Hello"))),
    +      Right(2000L),
    +      Left((4000L, (4L, 4, "Hello"))),
    +      Right(3000L),
    +      Left((5000L, (5L, 5, "Hello"))),
    +      Right(5000L),
    +      Left((6000L, (6L, 65, "Hello"))),
    +      Left((6000L, (6L, 6, "Hello"))),
    +      Right(7000L),
    +      Left((9000L, (6L, 9, "Hello"))),
    +      Left((8500L, (6L, 18, "Hello"))),
    +      Left((9000L, (6L, 7, "Hello"))),
    +      Right(10000L),
    +      Left((10000L, (7L, 7, "Hello World"))),
    +      Left((11000L, (7L, 77, "Hello World"))),
    +      Left((11000L, (7L, 17, "Hello World"))),
    +      Right(12000L),
    +      Left((14000L, (7L, 18, "Hello World"))),
    +      Right(14000L),
    +      Left((15000L, (8L, 8, "Hello World"))),
    +      Right(17000L),
    +      Left((20000L, (20L, 20, "Hello World"))), 
    +      Right(19000L))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, 
String)](data))
    +      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
    +      
    +    tEnv.registerTable("T1", t1)
    +
    +    val  sqlQuery = "SELECT b FROM T1 " +
    +      "ORDER BY rowtime, b ASC ";
    +      
    +      
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
    +    env.execute()
    +    
    +    val expected = mutable.MutableList(
    +      "1", "15", "16",
    +      "2", "2", "3",
    +      "3",
    +      "4",
    +      "5",
    +      "6", "65",
    +      "18", "7", "9",
    +      "7", "17", "77", 
    +      "18",
    +      "8",
    +      "20")
    +    assertEquals(expected, SortITCase.testResults)
    +  }
    +}
    +
    +object SortITCase {
    +
    +  class EventTimeSourceFunction[T](
    --- End diff --
    
    Make the `EventTimeSourceFunction` in `OverWindowITCase` a util class and 
reuse it here instead of duplicating the code.


> Support Limit/Top(Sort) for Stream SQL
> --------------------------------------
>
>                 Key: FLINK-6075
>                 URL: https://issues.apache.org/jira/browse/FLINK-6075
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: radu
>              Labels: features
>         Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime 
> indicates the processing time when a new event arrives in the system. Events 
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) 
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime||   Proctime||      Stream1||       Limit 2||       Top 2|| Sort 
> [ASC]||
> |         |10:00:00  |(aaa, 11)       |               |             |         
>    |
> |         |10:05:00    |(aab, 7)  |           |             |            |
> |10-11          |11:00:00  |          |       aab,aaa |aab,aaa  |     aab,aaa 
>    |
> |         |11:03:00  |(aac,21)  |           |         |            |          
>         
> |11-12    |12:00:00  |          |     aab,aaa |aab,aaa  |     aab,aaa,aac|
> |         |12:10:00  |(abb,12)  |           |         |            |          
>         
> |         |12:15:00  |(abb,12)  |           |         |            |          
>         
> |12-13          |13:00:00  |          |       abb,abb | abb,abb |     
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries, 
> the functionality will be implemented within the logic of the window as 
> follows.
> * Window assigner – selected based on the type of window used in SQL 
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the 
> window boundaries
> * Apply – window function that sorts data and selects the output to trigger 
> (based on LIMIT/TOP parameters). All data will be sorted at once and result 
> outputted when the window is triggered
> An alternative implementation can be to use a fold window function to sort 
> the elements as they arrive, one at a time followed by a flatMap to filter 
> the number of outputs. 
> !sort.png!
> **General logic of Join**
> ```
> inputDataStream.window(new [Slide/Tumble][Time/Count]Window())
> //.trigger(new [Time/Count]Trigger()) – use default
> //.evictor(new [Time/Count]Evictor()) – use default
>               .apply(SortAndFilter());
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to