Hi Feng, I think this FLIP would provide one important feature to unify the stream-SQL and batch-SQL when we backfill the historical data in batch mode.
For the "Syntax" session, I think you could add descriptions of how to align backfill time travel with querying the latest data. And I think you should also update the "Discussion thread" in the original FLIP. Moreover, I have a question about getting the table schema from the catalog. I'm not sure whether the Catalog#getTable(tablePath, timestamp) will be called only once. If we have a backfill query between 2023-05-29 and 2023-06-04 in the past week, and the table schema changed on 2023-06-01, will the query below detect the schema changes during backfill the whole week? SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN '2023-05-29 00:00:00' AND '2023-06-05 00:00:00' Best Yun Tang ________________________________ From: Shammon FY <zjur...@gmail.com> Sent: Thursday, June 1, 2023 17:57 To: dev@flink.apache.org <dev@flink.apache.org> Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode Hi Feng, I have one minor comment about the public interface `Optional<Long> getSnapshot()` in the `CatalogTable`. As we can get tables from the new method `Catalog.getTable(ObjectPath tablePath, long timestamp)`, I think the returned `CatalogBaseTable` will have the information of timestamp. Flink or connector such as iceberg/paimon can create sources from the `CatalogBaseTable` directly without the need to get the snapshot ID from `CatalogTable.getSnapshot()`. What do you think of it? Best, Shammon FY On Thu, Jun 1, 2023 at 7:22 AM Jing Ge <j...@ververica.com.invalid> wrote: > Hi Feng, > > Thanks for the proposal! Very interesting feature. Would you like to update > your thoughts described in your previous email about why SupportsTimeTravel > has been rejected into the FLIP? This will help readers understand the > context (in the future). > > Since we always directly add overload methods into Catalog according to new > requirements, which makes the interface bloated. Just out of curiosity, > does it make sense to introduce some DSL design? Like > Catalog.getTable(tablePath).on(timeStamp), > Catalog.getTable(tablePath).current() for the most current version, and > more room for further extension like timestamp range, etc. I haven't read > all the source code yet and I'm not sure if it is possible. But a > design like this will keep the Catalog API lean and the API/DSL will be > self described and easier to use. > > Best regards, > Jing > > > On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> wrote: > > > Ok after second though I'm retracting my previous statement about Catalog > > changes you proposed. > > I do see a benefit for Delta connector actually with this change and see > > why this could be coupled with Catalog. > > > > Delta Connector SQL support, also ships a Delta Catalog implementation > for > > Flink. > > For Delta Catalog, table schema information is fetched from underlying > > _delta_log and not stored in metastore. For time travel we actually had a > > problem, that if we would like to timetravel back to some old version, > > where schema was slightly different, then we would have a conflict since > > Catalog would return current schema and not how it was for version X. > > > > With your change, our Delta Catalog can actually fetch schema for > version X > > and send it to DeltaTableFactory. Currency, Catalog can fetch only > current > > version. What we would also need however is version (number/timestamp) > for > > this table passed to DynamicTableFactory so we could properly set Delta > > standalone library. > > > > Regards, > > Krzysztof > > > > śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski < > > krzysiek.chmielew...@gmail.com> napisał(a): > > > > > Hi, > > > happy to see such a feature. > > > Small note from my end regarding Catalog changes. > > > > > > TL;DR > > > I don't think it is necessary to delegate this feature to the catalog. > I > > > think that since "timetravel" is per job/query property, its should not > > be > > > coupled with the Catalog or table definition. In my opinion this is > > > something that DynamicTableFactory only has to know about. I would > rather > > > see this feature as it is - SQL syntax enhancement but delegate clearly > > to > > > DynamicTableFactory. > > > > > > I've implemented timetravel feature for Delta Connector [1] using > > > current Flink API. > > > Docs are pending code review, but you can find them here [2] and > examples > > > are available here [3] > > > > > > The timetravel feature that I've implemented is based on Flink Query > > > hints. > > > "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */" > > > > > > The " versionAsOf" (we also have 'timestampAsOf') parameter is handled > > not > > > by Catalog but by DyntamicTableFactory implementation for Delta > > connector. > > > The value of this property is passed to Delta standalone lib API that > > > returns table view for given version. > > > > > > I'm not sure how/if proposed change could benefit Delta connector > > > implementation for this feature. > > > > > > Thanks, > > > Krzysztof > > > > > > [1] > > > > > > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink > > > [2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs > > > [3] > > > > > > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql > > > > > > śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com> napisał(a): > > > > > >> Hi, Feng > > >> > > >> Thanks for driving this FLIP, Time travel is very useful for Flink > > >> integrate with data lake system. I have one question why the > > >> implementation > > >> of TimeTravel is delegated to Catalog? Assuming that we use Flink to > > query > > >> Hudi table with the time travel syntax, but we don't use the > > HudiCatalog, > > >> instead, we register the hudi table to InMemoryCatalog, can we > support > > >> time travel for Hudi table in this case? > > >> In contrast, I think time travel should bind to connector instead of > > >> Catalog, so the rejected alternative should be considered. > > >> > > >> Best, > > >> Ron > > >> > > >> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道: > > >> > > >> > Hi, Feng. > > >> > Notice this FLIP only support batch mode for time travel. Would it > > also > > >> > make sense to support stream mode to a read a snapshot of the table > > as a > > >> > bounded stream? > > >> > > > >> > Best regards, > > >> > Yuxia > > >> > > > >> > ----- 原始邮件 ----- > > >> > 发件人: "Benchao Li" <libenc...@apache.org> > > >> > 收件人: "dev" <dev@flink.apache.org> > > >> > 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53 > > >> > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > > >> > > > >> > # Can Calcite support this syntax ` VERSION AS OF` ? > > >> > > > >> > This also depends on whether this is defined in standard or any > known > > >> > databases that have implemented this. If not, it would be hard to > push > > >> it > > >> > to Calcite. > > >> > > > >> > # getTable(ObjectPath object, long timestamp) > > >> > > > >> > Then we again come to the problem of "casting between timestamp and > > >> > numeric", which has been disabled in FLINK-21978[1]. If you're gonna > > use > > >> > this, then we need to clarify that problem first. > > >> > > > >> > [1] https://issues.apache.org/jira/browse/FLINK-21978 > > >> > > > >> > > > >> > Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道: > > >> > > > >> > > hi, thanks for your reply. > > >> > > > > >> > > @Benchao > > >> > > > did you consider the pushdown abilities compatible > > >> > > > > >> > > In the current design, the implementation of TimeTravel is > delegated > > >> to > > >> > > Catalog. We have added a function called getTable(ObjectPath > > >> tablePath, > > >> > > long timestamp) to obtain the corresponding CatalogBaseTable at a > > >> > specific > > >> > > time. Therefore, I think it will not have any impact on the > > original > > >> > > pushdown abilities. > > >> > > > > >> > > > > >> > > > I see there is a rejected design for adding > SupportsTimeTravel, > > >> but > > >> > I > > >> > > didn't see the alternative in the FLIP doc > > >> > > > > >> > > Sorry, the document description is not very clear. Regarding > > whether > > >> to > > >> > > support SupportTimeTravel, I have discussed it with yuxia. Since > we > > >> have > > >> > > already passed the corresponding time in getTable(ObjectPath, long > > >> > > timestamp) of Catalog, SupportTimeTravel may not be necessary. > > >> > > > > >> > > In getTable(ObjectPath object, long timestamp), we can obtain the > > >> schema > > >> > of > > >> > > the corresponding time point and put the SNAPSHOT that needs to be > > >> > consumed > > >> > > into options. > > >> > > > > >> > > > > >> > > @Shammon > > >> > > > Could we support this in Flink too? > > >> > > > > >> > > I personally think it's possible, but limited by Calcite's syntax > > >> > > restrictions. I believe we should first support this syntax in > > >> Calcite. > > >> > > Currently, I think it may not be easy to support this syntax in > > >> Flink's > > >> > > parser. @Benchao, what do you think? Can Calcite support this > syntax > > >> > > ` VERSION AS OF` ? > > >> > > > > >> > > > > >> > > Best, > > >> > > Feng. > > >> > > > > >> > > > > >> > > On Fri, May 26, 2023 at 2:55 PM Shammon FY <zjur...@gmail.com> > > wrote: > > >> > > > > >> > > > Thanks Feng, the feature of time travel sounds great! > > >> > > > > > >> > > > In addition to SYSTEM_TIME, lake houses such as paimon and > iceberg > > >> > > support > > >> > > > snapshot or version. For example, users can query snapshot 1 for > > >> paimon > > >> > > by > > >> > > > the following statement > > >> > > > SELECT * FROM t VERSION AS OF 1 > > >> > > > > > >> > > > Could we support this in Flink too? > > >> > > > > > >> > > > Best, > > >> > > > Shammon FY > > >> > > > > > >> > > > On Fri, May 26, 2023 at 1:20 PM Benchao Li < > libenc...@apache.org> > > >> > wrote: > > >> > > > > > >> > > > > Regarding the implementation, did you consider the pushdown > > >> abilities > > >> > > > > compatible, e.g., projection pushdown, filter pushdown, > > partition > > >> > > > pushdown. > > >> > > > > Since `Snapshot` is not handled much in existing rules, I > have a > > >> > > concern > > >> > > > > about this. Of course, it depends on your implementation > detail, > > >> what > > >> > > is > > >> > > > > important is that we'd better add some cross tests for these. > > >> > > > > > > >> > > > > Regarding the interface exposed to Connector, I see there is a > > >> > rejected > > >> > > > > design for adding SupportsTimeTravel, but I didn't see the > > >> > alternative > > >> > > in > > >> > > > > the FLIP doc. IMO, this is an important thing we need to > clarify > > >> > > because > > >> > > > we > > >> > > > > need to know whether the Connector supports this, and what > > >> > > > column/metadata > > >> > > > > corresponds to 'system_time'. > > >> > > > > > > >> > > > > Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道: > > >> > > > > > > >> > > > > > Thanks for your reply > > >> > > > > > > > >> > > > > > @Timo @BenChao @yuxia > > >> > > > > > > > >> > > > > > Sorry for the mistake, Currently , calcite only supports > > `FOR > > >> > > > > SYSTEM_TIME > > >> > > > > > AS OF ` syntax. We can only support `FOR SYSTEM_TIME AS > OF` > > . > > >> > I've > > >> > > > > > updated the syntax part of the FLIP. > > >> > > > > > > > >> > > > > > > > >> > > > > > @Timo > > >> > > > > > > > >> > > > > > > We will convert it to TIMESTAMP_LTZ? > > >> > > > > > > > >> > > > > > Yes, I think we need to convert TIMESTAMP to TIMESTAMP_LTZ > and > > >> then > > >> > > > > convert > > >> > > > > > it into a long value. > > >> > > > > > > > >> > > > > > > How do we want to query the most recent version of a table > > >> > > > > > > > >> > > > > > I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does > > cause > > >> > > > > > inconsistency with the real-time concept. > > >> > > > > > However, from my personal understanding, the scope of `AS > OF > > >> > > > > > CURRENT_TIMESTAMP` is the table itself, not the table > record. > > >> So, > > >> > I > > >> > > > > think > > >> > > > > > using CURRENT_TIMESTAMP should also be reasonable?. > > >> > > > > > Additionally, if no version is specified, the latest version > > >> should > > >> > > be > > >> > > > > used > > >> > > > > > by default. > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > Best, > > >> > > > > > Feng > > >> > > > > > > > >> > > > > > > > >> > > > > > On Thu, May 25, 2023 at 7:47 PM yuxia < > > >> luoyu...@alumni.sjtu.edu.cn > > >> > > > > >> > > > > wrote: > > >> > > > > > > > >> > > > > > > Thanks Feng for bringing this up. It'll be great to > > introduce > > >> > time > > >> > > > > travel > > >> > > > > > > to Flink to have a better integration with external data > > >> soruces. > > >> > > > > > > > > >> > > > > > > I also share same concern about the syntax. > > >> > > > > > > I see in the part of `Whether to support other syntax > > >> > > > implementations` > > >> > > > > in > > >> > > > > > > this FLIP, seems the syntax in Calcite should be `FOR > > >> SYSTEM_TIME > > >> > > AS > > >> > > > > OF`, > > >> > > > > > > right? > > >> > > > > > > But the the syntax part in this FLIP, it seems to be `AS > OF > > >> > > > TIMESTAMP` > > >> > > > > > > instead of `FOR SYSTEM_TIME AS OF`. Is it just a mistake > or > > >> by > > >> > > > design? > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > Best regards, > > >> > > > > > > Yuxia > > >> > > > > > > > > >> > > > > > > ----- 原始邮件 ----- > > >> > > > > > > 发件人: "Benchao Li" <libenc...@apache.org> > > >> > > > > > > 收件人: "dev" <dev@flink.apache.org> > > >> > > > > > > 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17 > > >> > > > > > > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch > > Mode > > >> > > > > > > > > >> > > > > > > Thanks Feng, it's exciting to have this ability. > > >> > > > > > > > > >> > > > > > > Regarding the syntax section, are you proposing `AS OF` > > >> instead > > >> > of > > >> > > > `FOR > > >> > > > > > > SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is in > > the > > >> SQL > > >> > > > > > standard > > >> > > > > > > and has been supported in some database vendors such as > SQL > > >> > Server. > > >> > > > > About > > >> > > > > > > `AS OF`, is it in the standard or any database vendor > > supports > > >> > > this, > > >> > > > if > > >> > > > > > > yes, I think it's worth to add this support to Calcite, > and > > I > > >> > would > > >> > > > > give > > >> > > > > > a > > >> > > > > > > hand in Calcite side. Otherwise, I think we'd better to > use > > >> `FOR > > >> > > > SYSTEM > > >> > > > > > AS > > >> > > > > > > OF`. > > >> > > > > > > > > >> > > > > > > Timo Walther <twal...@apache.org> 于2023年5月25日周四 19:02写道: > > >> > > > > > > > > >> > > > > > > > Also: How do we want to query the most recent version > of a > > >> > table? > > >> > > > > > > > > > >> > > > > > > > `AS OF CURRENT_TIMESTAMP` would be ideal, but according > to > > >> the > > >> > > docs > > >> > > > > > both > > >> > > > > > > > the type is TIMESTAMP_LTZ and what is even more > concerning > > >> is > > >> > the > > >> > > > it > > >> > > > > > > > actually is evalated row-based: > > >> > > > > > > > > > >> > > > > > > > > Returns the current SQL timestamp in the local time > > zone, > > >> > the > > >> > > > > return > > >> > > > > > > > type is TIMESTAMP_LTZ(3). It is evaluated for each > record > > in > > >> > > > > streaming > > >> > > > > > > > mode. But in batch mode, it is evaluated once as the > query > > >> > starts > > >> > > > and > > >> > > > > > > > uses the same result for every row. > > >> > > > > > > > > > >> > > > > > > > This could make it difficult to explain in a join > scenario > > >> of > > >> > > > > multiple > > >> > > > > > > > snapshotted tables. > > >> > > > > > > > > > >> > > > > > > > Regards, > > >> > > > > > > > Timo > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > On 25.05.23 12:29, Timo Walther wrote: > > >> > > > > > > > > Hi Feng, > > >> > > > > > > > > > > >> > > > > > > > > thanks for proposing this FLIP. It makes a lot of > sense > > to > > >> > > > finally > > >> > > > > > > > > support querying tables at a specific point in time or > > >> > > hopefully > > >> > > > > also > > >> > > > > > > > > ranges soon. Following time-versioned tables. > > >> > > > > > > > > > > >> > > > > > > > > Here is some feedback from my side: > > >> > > > > > > > > > > >> > > > > > > > > 1. Syntax > > >> > > > > > > > > > > >> > > > > > > > > Can you elaborate a bit on the Calcite restrictions? > > >> > > > > > > > > > > >> > > > > > > > > Does Calcite currently support `AS OF` syntax for this > > but > > >> > not > > >> > > > `FOR > > >> > > > > > > > > SYSTEM_TIME AS OF`? > > >> > > > > > > > > > > >> > > > > > > > > It would be great to support `AS OF` also for > > >> time-versioned > > >> > > > joins > > >> > > > > > and > > >> > > > > > > > > have a unified and short syntax. > > >> > > > > > > > > > > >> > > > > > > > > Once a fix is merged in Calcite for this, we can make > > this > > >> > > > > available > > >> > > > > > in > > >> > > > > > > > > Flink earlier by copying the corresponding classes > until > > >> the > > >> > > next > > >> > > > > > > > > Calcite upgrade is performed. > > >> > > > > > > > > > > >> > > > > > > > > 2. Semantics > > >> > > > > > > > > > > >> > > > > > > > > How do we interpret the timestamp? In Flink we have 2 > > >> > timestamp > > >> > > > > types > > >> > > > > > > > > (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF > > >> > TIMESTAMP > > >> > > > > > > > > '2023-04-27 00:00:00', in which timezone will the > > >> timestamp > > >> > be? > > >> > > > We > > >> > > > > > will > > >> > > > > > > > > convert it to TIMESTAMP_LTZ? > > >> > > > > > > > > > > >> > > > > > > > > We definely need to clarify this because the past has > > >> shown > > >> > > that > > >> > > > > > > > > daylight saving times make our lives hard. > > >> > > > > > > > > > > >> > > > > > > > > Thanks, > > >> > > > > > > > > Timo > > >> > > > > > > > > > > >> > > > > > > > > On 25.05.23 10:57, Feng Jin wrote: > > >> > > > > > > > >> Hi, everyone. > > >> > > > > > > > >> > > >> > > > > > > > >> I’d like to start a discussion about FLIP-308: > Support > > >> Time > > >> > > > Travel > > >> > > > > > In > > >> > > > > > > > >> Batch > > >> > > > > > > > >> Mode [1] > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> Time travel is a SQL syntax used to query historical > > >> > versions > > >> > > of > > >> > > > > > data. > > >> > > > > > > > It > > >> > > > > > > > >> allows users to specify a point in time and retrieve > > the > > >> > data > > >> > > > and > > >> > > > > > > > >> schema of > > >> > > > > > > > >> a table as it appeared at that time. With time > travel, > > >> users > > >> > > can > > >> > > > > > > easily > > >> > > > > > > > >> analyze and compare historical versions of data. > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> With the widespread use of data lake systems such as > > >> Paimon, > > >> > > > > > Iceberg, > > >> > > > > > > > and > > >> > > > > > > > >> Hudi, time travel can provide more convenience for > > users' > > >> > data > > >> > > > > > > analysis. > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> Looking forward to your opinions, any suggestions are > > >> > > welcomed. > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> 1. > > >> > > > > > > > >> > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> Best. > > >> > > > > > > > >> > > >> > > > > > > > >> Feng > > >> > > > > > > > >> > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > -- > > >> > > > > > > > > >> > > > > > > Best, > > >> > > > > > > Benchao Li > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > -- > > >> > > > > > > >> > > > > Best, > > >> > > > > Benchao Li > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > >> > -- > > >> > > > >> > Best, > > >> > Benchao Li > > >> > > > >> > > > > > >