[
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949791#comment-15949791
]
ASF GitHub Bot commented on FLINK-5654:
---------------------------------------
Github user rtudoran commented on the issue:
https://github.com/apache/flink/pull/3641
@fhueske Thanks for the example. First of all i am happy that we agree that
we need to emit something for every input :). I was scared that this will not
be the case
Now regarding the 2 options for the output - i would still have a
preference for the first option. The reason for this is that at least in the
case of proctime there is no 2 events that are simultaneous in absolute terms.
There is an implicit serialization (also based on how flink engine works...one
event will arrive before the other). It might simply be a coincidence that the
granularity that the computers now cannot measure system time with more fine
granularity...and because of that we would have 2 events which are apparently
with the same proc time stamp...
However, as you mentioned several times (and i agree) having a uniform
behavior might be a key point. In this case we would indeed implement the
behavior you suggested. - Let me know again if you are sure about this. I
raised my concerns but i am open to accept what you suggest.
In case you suggest to implement it with proctimes, i have 2 questions:
1) do you know if there is some example for timer in proc time (if not, now
problem - i will figure it out)
2) in the case of harness tests - the correct implementation of the
behavior might not match the test. If i write a test that does
{code}
testHarness.setProcessingTime(3)
testHarness.processElement(new StreamRecord(rInput, 1001)) // let us
assume that the system time is 1490906681
testHarness.processElement(new StreamRecord(rInput, 2002)) // let us
assume that the system time is still 1490906681
+ testHarness.processElement(new StreamRecord(rInput, 2003)) // let us
assume that the system time is now 1490906682
+ testHarness.processElement(new StreamRecord(rInput, 2004)) // let us
assume that the system time is now 1490906682
{code}
in this case if we do a Count, than the output would be
ev1 2
ev2 2
ev3 2
ev4 2
...instead of having all with an associated count of 4 (which would be in
the behavior you mention). This would be because i would assume that the timer
+1ms would be triggered based on system time.
In such a case - should we just build a limited test that would work?
> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> ---------------------------------------------------------------------
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT
> a,
> SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1'
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
> MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1'
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some
> of the restrictions are trivial to address, we can add the functionality in
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with
> RexOver expression).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)