Hi everyone,

Every table with a primary key and an event-time attribute provides what is
needed for an event-time temporal table join.
I agree that, from a technical point of view, the TEMPORAL keyword is not
required.

I'm more sceptical about implicitly deriving the versioning information of
a (temporal) table as the table's only event-time attribute.
In the query

SELECT *
FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
WHERE o.currency = r.currency

the syntax of the temporal table join does not explicitly reference the
version of the temporal rates table.
Hence, the system needs a way to derive the version of temporal table.

Implicitly using the (only) event-time attribute of a temporal table (rates
in the example above) to identify the right version works in most cases,
but probably not in all.
* What if a table has more than one event-time attribute? (TableSchema is
designed to support multiple watermarks; queries with interval joins
produce tables with multiple event-time attributes, ...)
* What if the table does not have an event-time attribute in its schema but
the version should only be provided as meta data?

We could add a clause to define the version of a table, such as:

CREATE TABLE rates (
   currency CHAR(3) NOT NULL PRIMARY KEY,
   rate DOUBLE,
   rowtime TIMESTAMP,
   WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE),
VERSION (rowtime)
WITH (...);

The presence of a the VERSION clause (or whatever syntax) would explicitly
define the version of a (temporal) table.
It would also render the need for the TEMPORAL keyword superfluous because
there would be another indicator that a table can be used in a temporal
table join.

I'm OK with not adding the TEMPORAL keyword, but I recommend that we think
again about the proposed implicit definition of a table's version and how
it might limit use in the future.

Cheers,
Fabian

Am Mo., 22. Juni 2020 um 16:14 Uhr schrieb Jark Wu <imj...@gmail.com>:

> I'm also +1 for not adding the TEMPORAL keyword.
>
> +1 to make the PRIMARY KEY semantic clear for sources.
> From my point of view:
>
> 1) PRIMARY KEY on changelog souruce:
> It means that when the changelogs (INSERT/UPDATE/DELETE) are materialized,
> the materialized table should be unique on the primary key columns.
> Flink assumes messages are in order on the primary key. Flink doesn't
> validate/enforces the key integrity, but simply trust it (thus NOT
> ENFORCED).
> Flink will use the PRIMARY KEY for some optimization, e.g. use the PRIMARY
> KEY to update the materilized state by key in temporal join operator.
>
> 2) PRIMARY KEY on insert-only source:
> I prefer to have the same semantic to the batch source and changelog
> source, that it implies that records are not duplicate on the primary key.
> Flink just simply trust the primary key constraint, and doesn't valid it.
> If there is duplicate primary keys with INSERT changeflag, then result of
> Flink query might be wrong.
>
> If this is a TEMPORAL TABLE FUNCTION scenario, that source emits duplicate
> primary keys with INSERT changeflag, when we migrate this case to temporal
> table DDL,
> I think this source should emit INSERT/UPDATE (UPSERT) messages instead of
> INSERT-only messages,  e.g. a Kafka compacted topic source?
>
> Best,
> Jark
>
>
> On Mon, 22 Jun 2020 at 17:04, Konstantin Knauf <kna...@apache.org> wrote:
>
> > Hi everyone,
> >
> > I also agree with Leonard/Kurt's proposal for CREATE TEMPORAL TABLE.
> >
> > Best,
> >
> > Konstantin
> >
> > On Mon, Jun 22, 2020 at 10:53 AM Kurt Young <ykt...@gmail.com> wrote:
> >
> > > I agree with Timo, semantic about primary key needs more thought and
> > > discussion, especially after FLIP-95 and FLIP-105.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Mon, Jun 22, 2020 at 4:45 PM Timo Walther <twal...@apache.org>
> wrote:
> > >
> > > > Hi Leonard,
> > > >
> > > > thanks for the summary.
> > > >
> > > > After reading all of the previous arguments and working on FLIP-95. I
> > > > would also lean towards the conclusion of not adding the TEMPORAL
> > > keyword.
> > > >
> > > > After FLIP-95, what we considered as a CREATE TEMPORAL TABLE can be
> > > > represented as a CREATE TABLE with PRIMARY KEY and WATERMARK. The FOR
> > > > SYSTEM_TIME AS OF t would trigger the internal materialization and
> > > > "temporal" logic.
> > > >
> > > > However, we should discuss the meaning of PRIMARY KEY again in this
> > > > case. In a TEMPORAL TABLE scenario, the source would emit duplicate
> > > > primary keys with INSERT changeflag but at different point in time.
> > > > Currently, we require a PRIMARY KEY NOT ENFORCED declaration. The
> > > > changelog semantics of FLIP-95 and FLIP-105 don't work well with a
> > > > primary key declaration.
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > > On 20.06.20 17:08, Leonard Xu wrote:
> > > > > Hi everyone,
> > > > >
> > > > > Thanks for the nice discussion. I’d like to move forward the work,
> > > > please let me simply summarize the main opinion and current
> > divergences.
> > > > >
> > > > > 1. The agreements have been achieved:
> > > > >
> > > > > 1.1 The motivation we're discussing temporal table DDL is just for
> > > > creating temporal table in pure SQL to replace pre-process temporal
> > table
> > > > in YAML/Table API for usability.
> > > > > 1.2 The reason we use "TEMPORAL" keyword rather than “PERIOD FOR
> > > > SYSTEM_TIME” is to make user understand easily.
> > > > > 1.3 For append-only table, it can convert to changelog table which
> > has
> > > > been discussed in FLIP-105, we assume the following temporal table is
> > > comes
> > > > from changelog (Jark, fabian, Timo).
> > > > > 1.4 For temporal join syntax, using "FOR SYSTEM_TIME AS OF x"
> instead
> > > of
> > > > the current `LATERAL TABLE(rates(x))`  has come to an
> agreement(Fabian,
> > > > Timo, Seth, Konstantin, Kurt).
> > > > >
> > > > > 2. The small divergence :
> > > > >
> > > > > About the definition syntax of the temporal table,
> > > > >
> > > > > CREATE [TEMPORAL] TABLE rates (
> > > > >     currency CHAR(3) NOT NULL PRIMARY KEY,
> > > > >     rate DOUBLE,
> > > > >     rowtime TIMESTAMP,
> > > > >     WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> > > > > WITH (...);
> > > > >
> > > > > there is small divergence whether add "TEMPORAL" keyword or not.
> > > > >
> > > > > 2.1  one opinion is using "CREATE TEMPORAL TABLE" (Timo, Fabian,
> > Seth),
> > > > the main advantages are:
> > > > > (1)"TEMPORAL" keyword is intuitive to indicate the history tracking
> > > > semantics.
> > > > > (2)"TEMPORAL" keyword illustrates that queries can visit the
> previous
> > > > versions of a table like other DBMS use "PERIOD FOR SYSTEM_TIME"
> > keyword.
> > > > >
> > > > > 2.2 the other is using "CREATE TABLE"(Kurt), the main advantages
> are:
> > > > > (1)Just primary key and time attribute can track previous versions
> > of a
> > > > table well.
> > > > > (2)The temporal behavior is triggered by temporal join syntax
> rather
> > > > than in DDL, all Flink DDL table are dynamic table logically
> including
> > > > temporal table. If we decide to use "TEMPORAL" keyword and treats
> > > changelog
> > > > as temporal table, other tables backed queue like Kafka should also
> use
> > > > "TEMPORAL" keyword.
> > > > >
> > > > >
> > > > > IMO, the statement “CREATE TEMPORARY TEMPORAL TABLE...” follows
> with
> > > 2.1
> > > > may confuse users much. If we take a second to think about, for
> > > source/sink
> > > > table which may backed queue (like kafka) or DB (like MySQL), we did
> > not
> > > > add any keyword in DDL to specify they are source or sinks, it works
> > > well.
> > > > > I think temporal table is the third one,  kafka data source and DB
> > data
> > > > source can play as a source/sink/temporal table depends on the
> > > > position/syntax that user put them in the query. The above rates
> table
> > > > >      - can be a source table if user put it at `SELECT * FROM
> rates;`
> > > > >      - can be a temporal table if user put it at `SELECT * FROM
> > orders
> > > > JOIN rates FOR SYSTEM_TIME AS OF orders.proctime
> > > > >               ON orders.currency = rates.currency;`
> > > > >      - can be sink table if user put is at `INSERT INTO rates
> SELECT
> > *
> > > > FROM …; `
> > > > >  From these cases, we found all tables defined in Flink should be
> > > > dynamic table logically, the source/sink/temporal role depends on the
> > > > position/syntax in user’s query.
> > > > >        In fact we have used similar syntax for current lookup
> table,
> > we
> > > > didn’t add “LOOKUP" or “TEMPORAL" keyword for lookup table and
> trigger
> > > the
> > > > temporal join from the position/syntax(“FOR SYSTEM_TIME AS OF x") in
> > > query.
> > > > >
> > > > > So, I prefer to resolve the small divergence with “CREATE TABLE”
> > which
> > > > > (1) is more unified with our source/sink/temporal dynamic table
> > > > conceptually,
> > > > > (2) is aligned with current lookup table,
> > > > > (3) also make users learn less keyword.
> > > > >
> > > > > WDYT?
> > > > >
> > > > > Best,
> > > > > Leonard Xu
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> >
> > --
> >
> > Konstantin Knauf
> >
> > https://twitter.com/snntrable
> >
> > https://github.com/knaufk
> >
>

Reply via email to