GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/3808

    [FLINK-5884] [table] Integrate time indicators for Table API & SQL

    This PR introduces time indicators in the Table & SQL API. The PR splits 
the current row type into a logical and physical row type. Time indicators are 
part of every logical row and stay logical if there are just forwarded and not 
accessed for calculation. If a time attribute is passed to a function or 
arithmetic expression it will be materialized. A ProcessFunction can access the 
meta timestamp for materialization (code generation for this has to be 
implemented). Every logical plan includes time attributes now and conistent:
    
    ```
    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
    
    val windowedTable = table
      .window(Session withGap 7.milli on 'long as 'w)
      .groupBy('w, 'string)
      .select('string, 'int.count)
    
    val expected = unaryNode(
      "DataStreamAggregate",
      streamTableNode(0),
      term("groupBy", "string"),
      term(
        "window",
        SessionGroupWindow(
          WindowReference("w"),
          'long,
          7.milli)),
      term("select", "string", "COUNT(int) AS TMP_0")
    )
    ```
    
    
    
    Main changes of the current solution:
    
    - Processing and rowtime time indicators can be named arbitrarily
    - They can be defined as follows: stream.toTable(tEnv, 'long, 'int, 
'string, 'proctime.proctime) or stream.toTable(tEnv, 'long.rowtime, 'int, 
'string)
    - In a streaming environment: if the "long" field is already defined in the 
record, it will not be read by the runtime. "long" always represents the 
timestamp of the row.
    - In batch environment: "long" must be present in the record and will be 
read by the runtime.
    - The table definition looks equivalent in both batch and streaming (better 
unification than current state)
    - Internally row types are split up in a logical and a physical row type.
    - The logical row type contains time indicators, the physical rowtime never 
contains time indicators (the pure "long" will never be in a record)
    - After validation and query decorrelation, a special time indicator 
converter traverses the RelNodes and analyzes if the a time indicator is 
accessed or only forwarded.
    - An access to a time indicator means that we need to materialize the 
rowtime using a ProcessFunction (not yet implemented). The timestamp (not an 
indicator anymore) becomes part of the physical row. E.g. long.cast(STRING) 
would require a materialization
    - Forwarding of time indicators does not materialize the rowtime. It 
remains a logical attribute. E.g. .select('long)
    - Windows are only valid if they work on time indicators.
    - A new `RowSchema` unifies logical and physical rows and tries to make 
type handling throughout the classes easier and consistent (it is not used 
everywhere yet).
    
    Missing so far:
    - Java API tests for defining time attributes
    - Tests for TableSources that define time attributes
    - Multiwindow tests
    - ProcessFunction for accessing the timestamp
    - More tests in general

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-5884

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3808
    
----
commit 5550a6026614ef61a62e98654ee2cd2d45b6d64e
Author: twalthr <[email protected]>
Date:   2017-03-02T15:06:55Z

    [FLINK-5884] [table] Integrate time indicators for Table API & SQL

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to