[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949791#comment-15949791
 ] 

ASF GitHub Bot commented on FLINK-5654:
---------------------------------------

Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3641
  
    @fhueske Thanks for the example. First of all i am happy that we agree that 
we need to emit something for every input :). I was scared that this will not 
be the case
    
    Now regarding the 2 options for the output - i would still have a 
preference for the first option. The reason for this is that at least in the 
case of proctime there is no 2 events that are simultaneous in absolute terms. 
There is an implicit serialization (also based on how flink engine works...one 
event will arrive before the other). It might simply be a coincidence that the 
granularity that the computers now cannot measure system time with more fine 
granularity...and because of that we would have 2 events which are apparently 
with the same proc time stamp...
    However, as you mentioned several times (and i agree) having a uniform 
behavior might be a key point. In this case we would indeed implement the 
behavior you suggested. - Let me know again if you are sure about this. I 
raised my concerns but i am open to accept what you suggest.
    
    In case you suggest to implement it with proctimes, i have 2 questions:
    1) do you know if there is some example for timer in proc time (if not, now 
problem -  i will figure it out)
    2) in the case of harness tests - the correct implementation of the 
behavior might not match the test. If i write a test that does
    {code}
     testHarness.setProcessingTime(3)
     testHarness.processElement(new StreamRecord(rInput, 1001))  // let us 
assume that the system time is 1490906681
     testHarness.processElement(new StreamRecord(rInput, 2002))  // let us 
assume that the system time is still 1490906681
     +    testHarness.processElement(new StreamRecord(rInput, 2003))  // let us 
assume that the system time is now 1490906682
     +    testHarness.processElement(new StreamRecord(rInput, 2004)) // let us 
assume that the system time is now 1490906682
    {code}
    
    in this case if we do a Count, than the output would be
    ev1 2
    ev2 2
    ev3 2
    ev4 2
    ...instead of having all with an associated count of 4 (which would be in 
the behavior you mention). This would be because i would assume that the timer 
+1ms would be triggered based on system time.
    In such a case - should we just build a limited test that would work?


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> ---------------------------------------------------------------------
>
>                 Key: FLINK-5654
>                 URL: https://issues.apache.org/jira/browse/FLINK-5654
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to