[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936204#comment-15936204
 ] 

ASF GitHub Bot commented on FLINK-5658:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3386#discussion_r107385159
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
    @@ -317,4 +320,193 @@ class SqlITCase extends StreamingWithStateTestBase {
         result.addSink(new StreamITCase.StringSink)
         env.execute()
       }
    +
    +  /** test sliding event-time unbounded window with partition by **/
    +  @Test
    +  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +
    +    val sqlQuery = "SELECT a, b, c, " +
    +      "SUM(b) over (" +
    +      "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
    +      "count(b) over (" +
    +      "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
    +      "avg(b) over (" +
    +      "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
    +      "max(b) over (" +
    +      "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
    +      "min(b) over (" +
    +      "partition by a order by rowtime() range between unbounded preceding 
and current row) " +
    +      "from T1"
    +
    +    val t1 = env.addSource[(Int, Long, String)](new SourceFunction[(Int, 
Long, String)] {
    +      override def run(ctx: SourceContext[(Int, Long, String)]): Unit = {
    +        ctx.collectWithTimestamp((1, 1L, "Hi"), 14000005L)
    +        ctx.collectWithTimestamp((2, 1L, "Hello"), 14000000L)
    +        ctx.collectWithTimestamp((3, 1L, "Hello"), 14000002L)
    +        ctx.collectWithTimestamp((1, 2L, "Hello"), 14000003L)
    +        ctx.collectWithTimestamp((1, 3L, "Hello world"), 14000004L)
    +        ctx.collectWithTimestamp((3, 2L, "Hello world"), 14000007L)
    +        ctx.collectWithTimestamp((2, 2L, "Hello world"), 14000008L)
    +        ctx.emitWatermark(new Watermark(14000010L))
    +        ctx.collectWithTimestamp((1, 4L, "Hello world"), 14000008L)
    +        ctx.collectWithTimestamp((2, 3L, "Hello world"), 14000008L)
    +        ctx.collectWithTimestamp((3, 3L, "Hello world"), 14000008L)
    +        ctx.collectWithTimestamp((1, 5L, "Hello world"), 14000012L)
    +        ctx.emitWatermark(new Watermark(14000020L))
    +        ctx.collectWithTimestamp((1, 6L, "Hello world"), 14000021L)
    +        ctx.collectWithTimestamp((1, 6L, "Hello world"), 14000019L)
    +        ctx.collectWithTimestamp((2, 4L, "Hello world"), 14000018L)
    +        ctx.collectWithTimestamp((3, 4L, "Hello world"), 14000018L)
    +        ctx.collectWithTimestamp((2, 5L, "Hello world"), 14000022L)
    +        ctx.collectWithTimestamp((3, 5L, "Hello world"), 14000022L)
    +        ctx.collectWithTimestamp((1, 7L, "Hello world"), 14000024L)
    +        ctx.collectWithTimestamp((1, 8L, "Hello world"), 14000023L)
    +        ctx.collectWithTimestamp((1, 9L, "Hello world"), 14000021L)
    +        ctx.emitWatermark(new Watermark(14000030L))
    +      }
    +
    +      override def cancel(): Unit = {}
    +    }).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,2,Hello,2,1,2,2,2",
    +      "1,3,Hello world,5,2,2,3,2",
    +      "1,1,Hi,6,3,2,3,1",
    +      "2,1,Hello,1,1,1,1,1",
    +      "2,2,Hello world,3,2,1,2,1",
    +      "3,1,Hello,1,1,1,1,1",
    +      "3,2,Hello world,3,2,1,2,1",
    +      "1,5,Hello world,11,4,2,5,1",
    +      "1,6,Hello world,17,5,3,6,1",
    +      "1,9,Hello world,26,6,4,9,1",
    +      "1,8,Hello world,34,7,4,9,1",
    +      "1,7,Hello world,41,8,5,9,1",
    +      "2,5,Hello world,8,3,2,5,1",
    +      "3,5,Hello world,8,3,2,5,1"
    +    )
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  /** test sliding event-time unbounded window without partitiion by **/
    +  @Test
    +  def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    --- End diff --
    
    The test should also work correctly if we only set the parallelism of the 
source to 1.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> ------------------------------------------------------------------------
>
>                 Key: FLINK-5658
>                 URL: https://issues.apache.org/jira/browse/FLINK-5658
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to