Hi Feng,

I think this FLIP would provide one important feature to unify the stream-SQL 
and batch-SQL when we backfill the historical data in batch mode.

For the "Syntax" session, I think you could add descriptions of how to align 
backfill time travel with querying the latest data. And I think you should also 
update the "Discussion thread" in the original FLIP.

Moreover, I have a question about getting the table schema from the catalog. 
I'm not sure whether the Catalog#getTable(tablePath, timestamp) will be called 
only once. If we have a backfill query between 2023-05-29 and 2023-06-04 in the 
past week, and the table schema changed on 2023-06-01, will the query below 
detect the schema changes during backfill the whole week?

SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN '2023-05-29 
00:00:00' AND '2023-06-05 00:00:00'

Best
Yun Tang


________________________________
From: Shammon FY <zjur...@gmail.com>
Sent: Thursday, June 1, 2023 17:57
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

Hi Feng,

I have one minor comment about the public interface `Optional<Long>
getSnapshot()` in the `CatalogTable`.

As we can get tables from the new method `Catalog.getTable(ObjectPath
tablePath, long timestamp)`, I think the returned `CatalogBaseTable` will
have the information of timestamp. Flink or connector such as
iceberg/paimon can create sources from the `CatalogBaseTable` directly
without the need to get the snapshot ID from `CatalogTable.getSnapshot()`.
What do you think of it?

Best,
Shammon FY


On Thu, Jun 1, 2023 at 7:22 AM Jing Ge <j...@ververica.com.invalid> wrote:

> Hi Feng,
>
> Thanks for the proposal! Very interesting feature. Would you like to update
> your thoughts described in your previous email about why SupportsTimeTravel
> has been rejected into the FLIP? This will help readers understand the
> context (in the future).
>
> Since we always directly add overload methods into Catalog according to new
> requirements, which makes the interface bloated. Just out of curiosity,
> does it make sense to introduce some DSL design? Like
> Catalog.getTable(tablePath).on(timeStamp),
> Catalog.getTable(tablePath).current() for the most current version, and
> more room for further extension like timestamp range, etc. I haven't read
> all the source code yet and I'm not sure if it is possible. But a
> design like this will keep the Catalog API lean and the API/DSL will be
> self described and easier to use.
>
> Best regards,
> Jing
>
>
> On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
> > Ok after second though I'm retracting my previous statement about Catalog
> > changes you proposed.
> > I do see a benefit for Delta connector actually with this change and see
> > why this could be coupled with Catalog.
> >
> > Delta Connector SQL support, also ships a Delta Catalog implementation
> for
> > Flink.
> > For Delta Catalog, table schema information is fetched from underlying
> > _delta_log and not stored in metastore. For time travel we actually had a
> > problem, that if we would like to timetravel back to some old version,
> > where schema was slightly different, then we would have a conflict since
> > Catalog would return current schema and not how it was for version X.
> >
> > With your change, our Delta Catalog can actually fetch schema for
> version X
> > and send it to DeltaTableFactory. Currency, Catalog can fetch only
> current
> > version. What we would also need however is version (number/timestamp)
> for
> > this table passed to DynamicTableFactory so we could properly set Delta
> > standalone library.
> >
> > Regards,
> > Krzysztof
> >
> > śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski <
> > krzysiek.chmielew...@gmail.com> napisał(a):
> >
> > > Hi,
> > > happy to see such a feature.
> > > Small note from my end regarding Catalog changes.
> > >
> > > TL;DR
> > > I don't think it is necessary to delegate this feature to the catalog.
> I
> > > think that since "timetravel" is per job/query property, its should not
> > be
> > > coupled with the Catalog or table definition. In my opinion this is
> > > something that DynamicTableFactory only has to know about. I would
> rather
> > > see this feature as it is - SQL syntax enhancement but delegate clearly
> > to
> > > DynamicTableFactory.
> > >
> > > I've implemented timetravel feature for Delta Connector  [1]  using
> > > current Flink API.
> > > Docs are pending code review, but you can find them here [2] and
> examples
> > > are available here [3]
> > >
> > > The timetravel feature that I've implemented is based on Flink Query
> > > hints.
> > > "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */"
> > >
> > > The " versionAsOf" (we also have 'timestampAsOf') parameter is handled
> > not
> > > by Catalog but by DyntamicTableFactory implementation for Delta
> > connector.
> > > The value of this property is passed to Delta standalone lib API that
> > > returns table view for given version.
> > >
> > > I'm not sure how/if proposed change could benefit Delta connector
> > > implementation for this feature.
> > >
> > > Thanks,
> > > Krzysztof
> > >
> > > [1]
> > >
> >
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink
> > > [2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs
> > > [3]
> > >
> >
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql
> > >
> > > śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com> napisał(a):
> > >
> > >> Hi, Feng
> > >>
> > >> Thanks for driving this FLIP, Time travel is very useful for Flink
> > >> integrate with data lake system. I have one question why the
> > >> implementation
> > >> of TimeTravel is delegated to Catalog? Assuming that we use Flink to
> > query
> > >> Hudi table with the time travel syntax, but we don't use the
> > HudiCatalog,
> > >> instead, we register the hudi table to InMemoryCatalog,  can we
> support
> > >> time travel for Hudi table in this case?
> > >> In contrast, I think time travel should bind to connector instead of
> > >> Catalog, so the rejected alternative should be considered.
> > >>
> > >> Best,
> > >> Ron
> > >>
> > >> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道:
> > >>
> > >> > Hi, Feng.
> > >> > Notice this FLIP only support batch mode for time travel.  Would it
> > also
> > >> > make sense to support stream mode to a read a snapshot of the table
> > as a
> > >> > bounded stream?
> > >> >
> > >> > Best regards,
> > >> > Yuxia
> > >> >
> > >> > ----- 原始邮件 -----
> > >> > 发件人: "Benchao Li" <libenc...@apache.org>
> > >> > 收件人: "dev" <dev@flink.apache.org>
> > >> > 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53
> > >> > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
> > >> >
> > >> > # Can Calcite support this syntax ` VERSION AS OF`  ?
> > >> >
> > >> > This also depends on whether this is defined in standard or any
> known
> > >> > databases that have implemented this. If not, it would be hard to
> push
> > >> it
> > >> > to Calcite.
> > >> >
> > >> > # getTable(ObjectPath object, long timestamp)
> > >> >
> > >> > Then we again come to the problem of "casting between timestamp and
> > >> > numeric", which has been disabled in FLINK-21978[1]. If you're gonna
> > use
> > >> > this, then we need to clarify that problem first.
> > >> >
> > >> > [1] https://issues.apache.org/jira/browse/FLINK-21978
> > >> >
> > >> >
> > >> > Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道:
> > >> >
> > >> > > hi, thanks for your reply.
> > >> > >
> > >> > > @Benchao
> > >> > > > did you consider the pushdown abilities compatible
> > >> > >
> > >> > > In the current design, the implementation of TimeTravel is
> delegated
> > >> to
> > >> > > Catalog. We have added a function called getTable(ObjectPath
> > >> tablePath,
> > >> > > long timestamp) to obtain the corresponding CatalogBaseTable at a
> > >> > specific
> > >> > > time.  Therefore, I think it will not have any impact on the
> > original
> > >> > > pushdown abilities.
> > >> > >
> > >> > >
> > >> > > >   I see there is a rejected  design for adding
> SupportsTimeTravel,
> > >> but
> > >> > I
> > >> > > didn't see the alternative in  the FLIP doc
> > >> > >
> > >> > > Sorry, the document description is not very clear.  Regarding
> > whether
> > >> to
> > >> > > support SupportTimeTravel, I have discussed it with yuxia. Since
> we
> > >> have
> > >> > > already passed the corresponding time in getTable(ObjectPath, long
> > >> > > timestamp) of Catalog, SupportTimeTravel may not be necessary.
> > >> > >
> > >> > > In getTable(ObjectPath object, long timestamp), we can obtain the
> > >> schema
> > >> > of
> > >> > > the corresponding time point and put the SNAPSHOT that needs to be
> > >> > consumed
> > >> > > into options.
> > >> > >
> > >> > >
> > >> > > @Shammon
> > >> > > > Could we support this in Flink too?
> > >> > >
> > >> > > I personally think it's possible, but limited by Calcite's syntax
> > >> > > restrictions. I believe we should first support this syntax in
> > >> Calcite.
> > >> > > Currently, I think it may not be easy  to support this syntax in
> > >> Flink's
> > >> > > parser. @Benchao, what do you think? Can Calcite support this
> syntax
> > >> > > ` VERSION AS OF`  ?
> > >> > >
> > >> > >
> > >> > > Best,
> > >> > > Feng.
> > >> > >
> > >> > >
> > >> > > On Fri, May 26, 2023 at 2:55 PM Shammon FY <zjur...@gmail.com>
> > wrote:
> > >> > >
> > >> > > > Thanks Feng, the feature of time travel sounds great!
> > >> > > >
> > >> > > > In addition to SYSTEM_TIME, lake houses such as paimon and
> iceberg
> > >> > > support
> > >> > > > snapshot or version. For example, users can query snapshot 1 for
> > >> paimon
> > >> > > by
> > >> > > > the following statement
> > >> > > > SELECT * FROM t VERSION AS OF 1
> > >> > > >
> > >> > > > Could we support this in Flink too?
> > >> > > >
> > >> > > > Best,
> > >> > > > Shammon FY
> > >> > > >
> > >> > > > On Fri, May 26, 2023 at 1:20 PM Benchao Li <
> libenc...@apache.org>
> > >> > wrote:
> > >> > > >
> > >> > > > > Regarding the implementation, did you consider the pushdown
> > >> abilities
> > >> > > > > compatible, e.g., projection pushdown, filter pushdown,
> > partition
> > >> > > > pushdown.
> > >> > > > > Since `Snapshot` is not handled much in existing rules, I
> have a
> > >> > > concern
> > >> > > > > about this. Of course, it depends on your implementation
> detail,
> > >> what
> > >> > > is
> > >> > > > > important is that we'd better add some cross tests for these.
> > >> > > > >
> > >> > > > > Regarding the interface exposed to Connector, I see there is a
> > >> > rejected
> > >> > > > > design for adding SupportsTimeTravel, but I didn't see the
> > >> > alternative
> > >> > > in
> > >> > > > > the FLIP doc. IMO, this is an important thing we need to
> clarify
> > >> > > because
> > >> > > > we
> > >> > > > > need to know whether the Connector supports this, and what
> > >> > > > column/metadata
> > >> > > > > corresponds to 'system_time'.
> > >> > > > >
> > >> > > > > Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道:
> > >> > > > >
> > >> > > > > > Thanks for your reply
> > >> > > > > >
> > >> > > > > > @Timo @BenChao @yuxia
> > >> > > > > >
> > >> > > > > > Sorry for the mistake,  Currently , calcite only supports
> > `FOR
> > >> > > > > SYSTEM_TIME
> > >> > > > > > AS OF `  syntax.  We can only support `FOR SYSTEM_TIME AS
> OF`
> > .
> > >> > I've
> > >> > > > > > updated the syntax part of the FLIP.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > @Timo
> > >> > > > > >
> > >> > > > > > > We will convert it to TIMESTAMP_LTZ?
> > >> > > > > >
> > >> > > > > > Yes, I think we need to convert TIMESTAMP to TIMESTAMP_LTZ
> and
> > >> then
> > >> > > > > convert
> > >> > > > > > it into a long value.
> > >> > > > > >
> > >> > > > > > > How do we want to query the most recent version of a table
> > >> > > > > >
> > >> > > > > > I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does
> > cause
> > >> > > > > > inconsistency with the real-time concept.
> > >> > > > > > However, from my personal understanding, the scope of  `AS
> OF
> > >> > > > > > CURRENT_TIMESTAMP` is the table itself, not the table
> record.
> > >> So,
> > >> > I
> > >> > > > > think
> > >> > > > > > using CURRENT_TIMESTAMP should also be reasonable?.
> > >> > > > > > Additionally, if no version is specified, the latest version
> > >> should
> > >> > > be
> > >> > > > > used
> > >> > > > > > by default.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > Best,
> > >> > > > > > Feng
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Thu, May 25, 2023 at 7:47 PM yuxia <
> > >> luoyu...@alumni.sjtu.edu.cn
> > >> > >
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Thanks Feng for bringing this up. It'll be great to
> > introduce
> > >> > time
> > >> > > > > travel
> > >> > > > > > > to Flink to have a better integration with external data
> > >> soruces.
> > >> > > > > > >
> > >> > > > > > > I also share same concern about the syntax.
> > >> > > > > > > I see in the part of `Whether to support other syntax
> > >> > > > implementations`
> > >> > > > > in
> > >> > > > > > > this FLIP, seems the syntax in Calcite should be `FOR
> > >> SYSTEM_TIME
> > >> > > AS
> > >> > > > > OF`,
> > >> > > > > > > right?
> > >> > > > > > > But the the syntax part in this FLIP, it seems to be `AS
> OF
> > >> > > > TIMESTAMP`
> > >> > > > > > > instead of  `FOR SYSTEM_TIME AS OF`. Is it just a mistake
> or
> > >> by
> > >> > > > design?
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > Best regards,
> > >> > > > > > > Yuxia
> > >> > > > > > >
> > >> > > > > > > ----- 原始邮件 -----
> > >> > > > > > > 发件人: "Benchao Li" <libenc...@apache.org>
> > >> > > > > > > 收件人: "dev" <dev@flink.apache.org>
> > >> > > > > > > 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17
> > >> > > > > > > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch
> > Mode
> > >> > > > > > >
> > >> > > > > > > Thanks Feng, it's exciting to have this ability.
> > >> > > > > > >
> > >> > > > > > > Regarding the syntax section, are you proposing `AS OF`
> > >> instead
> > >> > of
> > >> > > > `FOR
> > >> > > > > > > SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is in
> > the
> > >> SQL
> > >> > > > > > standard
> > >> > > > > > > and has been supported in some database vendors such as
> SQL
> > >> > Server.
> > >> > > > > About
> > >> > > > > > > `AS OF`, is it in the standard or any database vendor
> > supports
> > >> > > this,
> > >> > > > if
> > >> > > > > > > yes, I think it's worth to add this support to Calcite,
> and
> > I
> > >> > would
> > >> > > > > give
> > >> > > > > > a
> > >> > > > > > > hand in Calcite side. Otherwise, I think we'd better to
> use
> > >> `FOR
> > >> > > > SYSTEM
> > >> > > > > > AS
> > >> > > > > > > OF`.
> > >> > > > > > >
> > >> > > > > > > Timo Walther <twal...@apache.org> 于2023年5月25日周四 19:02写道:
> > >> > > > > > >
> > >> > > > > > > > Also: How do we want to query the most recent version
> of a
> > >> > table?
> > >> > > > > > > >
> > >> > > > > > > > `AS OF CURRENT_TIMESTAMP` would be ideal, but according
> to
> > >> the
> > >> > > docs
> > >> > > > > > both
> > >> > > > > > > > the type is TIMESTAMP_LTZ and what is even more
> concerning
> > >> is
> > >> > the
> > >> > > > it
> > >> > > > > > > > actually is evalated row-based:
> > >> > > > > > > >
> > >> > > > > > > >  > Returns the current SQL timestamp in the local time
> > zone,
> > >> > the
> > >> > > > > return
> > >> > > > > > > > type is TIMESTAMP_LTZ(3). It is evaluated for each
> record
> > in
> > >> > > > > streaming
> > >> > > > > > > > mode. But in batch mode, it is evaluated once as the
> query
> > >> > starts
> > >> > > > and
> > >> > > > > > > > uses the same result for every row.
> > >> > > > > > > >
> > >> > > > > > > > This could make it difficult to explain in a join
> scenario
> > >> of
> > >> > > > > multiple
> > >> > > > > > > > snapshotted tables.
> > >> > > > > > > >
> > >> > > > > > > > Regards,
> > >> > > > > > > > Timo
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On 25.05.23 12:29, Timo Walther wrote:
> > >> > > > > > > > > Hi Feng,
> > >> > > > > > > > >
> > >> > > > > > > > > thanks for proposing this FLIP. It makes a lot of
> sense
> > to
> > >> > > > finally
> > >> > > > > > > > > support querying tables at a specific point in time or
> > >> > > hopefully
> > >> > > > > also
> > >> > > > > > > > > ranges soon. Following time-versioned tables.
> > >> > > > > > > > >
> > >> > > > > > > > > Here is some feedback from my side:
> > >> > > > > > > > >
> > >> > > > > > > > > 1. Syntax
> > >> > > > > > > > >
> > >> > > > > > > > > Can you elaborate a bit on the Calcite restrictions?
> > >> > > > > > > > >
> > >> > > > > > > > > Does Calcite currently support `AS OF` syntax for this
> > but
> > >> > not
> > >> > > > `FOR
> > >> > > > > > > > > SYSTEM_TIME AS OF`?
> > >> > > > > > > > >
> > >> > > > > > > > > It would be great to support `AS OF` also for
> > >> time-versioned
> > >> > > > joins
> > >> > > > > > and
> > >> > > > > > > > > have a unified and short syntax.
> > >> > > > > > > > >
> > >> > > > > > > > > Once a fix is merged in Calcite for this, we can make
> > this
> > >> > > > > available
> > >> > > > > > in
> > >> > > > > > > > > Flink earlier by copying the corresponding classes
> until
> > >> the
> > >> > > next
> > >> > > > > > > > > Calcite upgrade is performed.
> > >> > > > > > > > >
> > >> > > > > > > > > 2. Semantics
> > >> > > > > > > > >
> > >> > > > > > > > > How do we interpret the timestamp? In Flink we have 2
> > >> > timestamp
> > >> > > > > types
> > >> > > > > > > > > (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF
> > >> > TIMESTAMP
> > >> > > > > > > > > '2023-04-27 00:00:00', in which timezone will the
> > >> timestamp
> > >> > be?
> > >> > > > We
> > >> > > > > > will
> > >> > > > > > > > > convert it to TIMESTAMP_LTZ?
> > >> > > > > > > > >
> > >> > > > > > > > > We definely need to clarify this because the past has
> > >> shown
> > >> > > that
> > >> > > > > > > > > daylight saving times make our lives hard.
> > >> > > > > > > > >
> > >> > > > > > > > > Thanks,
> > >> > > > > > > > > Timo
> > >> > > > > > > > >
> > >> > > > > > > > > On 25.05.23 10:57, Feng Jin wrote:
> > >> > > > > > > > >> Hi, everyone.
> > >> > > > > > > > >>
> > >> > > > > > > > >> I’d like to start a discussion about FLIP-308:
> Support
> > >> Time
> > >> > > > Travel
> > >> > > > > > In
> > >> > > > > > > > >> Batch
> > >> > > > > > > > >> Mode [1]
> > >> > > > > > > > >>
> > >> > > > > > > > >>
> > >> > > > > > > > >> Time travel is a SQL syntax used to query historical
> > >> > versions
> > >> > > of
> > >> > > > > > data.
> > >> > > > > > > > It
> > >> > > > > > > > >> allows users to specify a point in time and retrieve
> > the
> > >> > data
> > >> > > > and
> > >> > > > > > > > >> schema of
> > >> > > > > > > > >> a table as it appeared at that time. With time
> travel,
> > >> users
> > >> > > can
> > >> > > > > > > easily
> > >> > > > > > > > >> analyze and compare historical versions of data.
> > >> > > > > > > > >>
> > >> > > > > > > > >>
> > >> > > > > > > > >> With the widespread use of data lake systems such as
> > >> Paimon,
> > >> > > > > > Iceberg,
> > >> > > > > > > > and
> > >> > > > > > > > >> Hudi, time travel can provide more convenience for
> > users'
> > >> > data
> > >> > > > > > > analysis.
> > >> > > > > > > > >>
> > >> > > > > > > > >>
> > >> > > > > > > > >> Looking forward to your opinions, any suggestions are
> > >> > > welcomed.
> > >> > > > > > > > >>
> > >> > > > > > > > >>
> > >> > > > > > > > >>
> > >> > > > > > > > >> 1.
> > >> > > > > > > > >>
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode
> > >> > > > > > > > >>
> > >> > > > > > > > >>
> > >> > > > > > > > >>
> > >> > > > > > > > >> Best.
> > >> > > > > > > > >>
> > >> > > > > > > > >> Feng
> > >> > > > > > > > >>
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > > > --
> > >> > > > > > >
> > >> > > > > > > Best,
> > >> > > > > > > Benchao Li
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > >
> > >> > > > > Best,
> > >> > > > > Benchao Li
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> >
> > >> > Best,
> > >> > Benchao Li
> > >> >
> > >>
> > >
> >
>

Reply via email to