As you can see that you must use `UNIX_TIMESTAMP` to do this work, that's where the time zone happens.
What I'm talking about is casting timestamp/timestamp_ltz to long directly, that's why the semantic is tricky when you are casting timestamp to long using time zone. For other systems, such as SQL server[1], they actually uses a string instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01 00:00:00.0000000'`, I'm not sure whether they convert the string implicitly to TIMESTAMP_LTZ, or they just have a different definition of the syntax. But for us, we are definitely using timestamp/timestmap_ltz literal here, that's why it is special, and we must highlight this behavior that we are converting a timestamp without time zone literal to long using the session time zone. [1] https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16 Feng Jin <jinfeng1...@gmail.com> 于2023年6月8日周四 11:35写道: > Hi all, > > thanks for your input > > > @Benchao > > > The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP > WITHOUT TIME ZONE", converting it to unix timestamp would use UTC timezone, > which is not usually expected by users. > > It was indeed the case before Flink 1.13, but now my understanding is that > there have been some slight changes in the definition of TIMESTAMP. > > TIMESTAMP is currently used to specify the year, month, day, hour, minute > and second. We recommend that users use *UNIX_TIMESTAMP(CAST(timestamp_col > AS STRING))* to convert *TIMESTAMP values* and *long values*. The > *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore, > whether converting TIMESTAMP or TIMESTAMP_LTZ to Long values will involve > using the *LOCAL TIME ZONE*. > > > Here is an test: > > Flink SQL> SET 'table.local-time-zone' = 'UTC'; > Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as > STRING)) as `timestamp`; > --------------- > timestamp > -------------- > 0 > > Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; > Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as > STRING)) as `timestamp`; > --------------- > timestamp > -------------- > -28800 > > Therefore, the current conversion method exposed to users is also using > LOCAL TIME ZONE. > > > @yuxia > > Thank you very much for providing the list of behaviors of TIMESTAMP in > other systems. > > > I think we can align them to avoid the inconsistency to other engines and > provide convenience for the external connectors while integrating Flink's > time travel API. > > +1 for this. > > > Regarding the inconsistency, I think we can consider time-travel as a > specical case, and we do needs to highlight this in this FLIP. > As for "violate the restriction outlined in FLINK-21978[1]", since we cast > timestamp to epochMillis only for the internal use, and won't expose it to > users, I don't think it will violate the restriction. > Btw, please add a brief desc to explain the meaning of the parameter > `timestamp` in method `CatalogBaseTable getTable(ObjectPath tablePath, long > timestamp)`. Maybe something like "timestamp of the table snapt, which is > millseconds since 1970-01-01 00:00:00 UTC". > > Thank you for the suggestions regarding the document. I will add them to > FLIP. > > > Best, > Feng > > > On Wed, Jun 7, 2023 at 12:18 PM Benchao Li <libenc...@apache.org> wrote: > > > I also share the concern about the timezone problem. > > > > The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP > WITHOUT > > TIME ZONE", converting it to unix timestamp would use UTC timezone, which > > is not usually expected by users. > > > > If we want to keep consistent with the standard, we probably should use > > "TIMESTAMP WITH LOCAL ZONE '2023-04-27 00:00:00'", which type is > "TIMESTAMP > > WITH LOCAL TIME ZONE", and converting it to unix timestamp will consider > > the session timezone, which is the expected result. But it's inconvenient > > for users. > > > > Taking this a special case, and converting "TIMESTAMP '2023-04-27 > > 00:00:00'" to a unix timestamp with session timezone, will be convenient > > for users, but will break the standard. I will +0.5 for this choice. > > > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年6月7日周三 12:06写道: > > > > > Hi, Feng Jin. > > > I think the concern of Leonard may be the inconsistency of the behavior > > of > > > TIMESTAMP '2023-04-27 00:00:00' beween timetravel and other sql > > statement. > > > > > > For the normal sql: > > > `SELECT TIMESTAMP '2023-04-27 00:00:00'`, we won't consider timezone. > > > But for the sql for timetravl: > > > `SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP '2023-04-27 > > > 00:00:00'`, we will consider the timezone and convert to UTC timestamp. > > > > > > The concern is valid. But for time travel, most style of engines, > > > Spark[1], Hive[2], Trino[3] also do the time conversion with > considering > > > the seesion time zone. I think we can align them to avoid the > > inconsistency > > > to other engines and provide convenience for the external connectors > > while > > > integrating Flink's time travel API. > > > > > > Regarding the inconsistency, I think we can consider time-travel as a > > > specical case, and we do needs to highlight this in this FLIP. > > > As for "violate the restriction outlined in FLINK-21978[1]", since we > > cast > > > timestamp to epochMillis only for the internal use, and won't expose it > > to > > > users, I don't think it will violate the restriction. > > > Btw, please add a brief desc to explain the meaning of the parameter > > > `timestamp` in method `CatalogBaseTable getTable(ObjectPath tablePath, > > long > > > timestamp)`. Maybe something like "timestamp of the table snapt, which > is > > > millseconds since 1970-01-01 00:00:00 UTC". > > > > > > [1] > > > > > > https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56 > > > [2] > > > > > > https://github.com/apache/hive/blob/f5e69dc38d7ff26c70be19adc9d1a3ae90dc4cf2/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java#L989 > > > [3] > > > > > > https://github.com/trinodb/trino/blob/2433d9e60f1abb0d85c32374c1758525560e1a86/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java#L443 > > > > > > > > > Best regards, > > > Yuxia > > > > > > ----- 原始邮件 ----- > > > 发件人: "Feng Jin" <jinfeng1...@gmail.com> > > > 收件人: "dev" <dev@flink.apache.org> > > > 发送时间: 星期二, 2023年 6 月 06日 下午 10:15:47 > > > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > > > > > > Hi everyone > > > > > > Thanks everyone for your input. > > > > > > > > > @Yun > > > > > > > I think you could add descriptions of how to align backfill time > > travel > > > with querying the latest data. And I think you should also update the > > > "Discussion thread" in the original FLIP. > > > > > > Thank you for the suggestion, I will update it in the document. > > > > > > > I have a question about getting the table schema from the catalog. > I'm > > > not sure whether the Catalog#getTable(tablePath, timestamp) will be > > called > > > only once. > > > > > > I understand that in a query, the schema of the table is determined > > before > > > execution. The schema used will be based on the latest schema within > the > > > TimeTravel period. > > > > > > In addition, due to current syntax limitations, we are unable to > support > > > the use of BETWEEN AND. > > > > > > > > > @Jing > > > > > > > Would you like to update your thoughts described in your previous > > email > > > about why SupportsTimeTravel has been rejected into the FLIP? > > > > > > Sure, I updated the doc. > > > > > > > > > > Since we always directly add overload methods into Catalog > according > > > to new requirements, which makes the interface bloated > > > > > > Your concern is valid. If we need to support the long type version in > the > > > future, we may have to add another method "getTable(ObjectPath, long > > > version)". However, I understand that > > > "Catalog.getTable(tablePath).on(timeStamp)" may not meet the > > requirements. > > > The timestamp is for Catalog's use, and Catalog obtains the > corresponding > > > schema based on this time. > > > > > > > > > @liu @Regards > > > > > > I am very sorry for the unclear description in the document. I have > > updated > > > relevant descriptions regarding why it needs to be implemented in > > Catalog. > > > > > > Travel not only requires obtaining data at the corresponding time > point, > > > but also requires the corresponding Schema at that time point > > > > > > > > > @Shammon > > > > > > > Flink or connector such as iceberg/paimon can create sources from > the > > > `CatalogBaseTable` directly without the need to get the snapshot ID > from > > > `CatalogTable.getSnapshot()`. What do you think of it? > > > > > > You are right, we don't need the getSnapshot interface for > PaimonCatalog > > or > > > IcebergCatalog tables, but we may need it for temporary tables. > > > > > > > > > > > > Best, > > > Feng > > > > > > > > > On Tue, Jun 6, 2023 at 9:32 PM Feng Jin <jinfeng1...@gmail.com> wrote: > > > > > > > Sorry I replied to the wrong mail. Please ignore the last email. > > > > > > > > > > > > Hi Leonard > > > > > > > > > 1. Unification SQL > > > > > > > > I agree that it is crucial for us to support both batch and streaming > > > > processing. The current design allows for the support of both batch > > and > > > > streaming processing. I'll update the FLIP later. > > > > > > > > > > > > > 2.Semantics > > > > > > > > In my opinion, it would be feasible to perform the conversion based > on > > > the > > > > current session time, regardless of whether it is TIMESTAMP or > > > > TIMESTAMP_LTZ. > > > > > > > > However, this may indeed violate the restriction outlined in > > > > FLINK-21978[1] as Benchao mentioned, and I am uncertain as to > whether > > it > > > > is reasonable. > > > > > > > > > > > > > 3. Some external systems may use timestamp value to mark a > > version, > > > > but others may use version number、file position、log offset. > > > > > > > > It is true that most systems support time-related operations, and I > > > > believe that the current design is compatible with most systems. > > However, > > > > if we want to support long data type, it may require Calcite to > support > > > the > > > > VERSION AS OF syntax. I understand that this is something that we may > > > need > > > > to consider in the future. > > > > > > > > > > > > Best, > > > > Feng > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-21978 > > > > > > > > On Tue, Jun 6, 2023 at 8:28 PM Leonard Xu <xbjt...@gmail.com> wrote: > > > > > > > >> Hi, Feng > > > >> > > > >> Thanks for driving this FLIP, very impressive feature that users > want, > > > >> I’ve some quick questions here. > > > >> > > > >> 1.Unification SQL: > > > >> The snapshot concept exists both in Batch mode and > Streaming > > > >> mode, could we consider a unified proposal? I think users won’t > > another > > > >> SQL syntax named > > > >> Time travel for Streaming mode. > > > >> > > > >> 2.Semantics: > > > >> Flink supports TIMESTAMP and TIMESTAMP_LTZ types, to get a > > long > > > >> timestamp value (getTable(ObjectPath tablePath, long timestamp)) we > > need > > > >> two information i.e. a TIMESTAMP value and current session timezone, > > > how > > > >> we deal the value with current proposed SQL syntax. > > > >> > > > >> 3. Is it enough using sinlge timestamp to track a snapshot(version) > of > > > >> external table? Some external systems may use timestamp value to > > mark > > > a > > > >> version, but others may use version number、file position、log offset. > > > >> > > > >> Best, > > > >> Leonard > > > >> > > > >> > > > >> > > > >> > On Jun 5, 2023, at 3:28 PM, Yun Tang <myas...@live.com> wrote: > > > >> > > > > >> > Hi Feng, > > > >> > > > > >> > I think this FLIP would provide one important feature to unify the > > > >> stream-SQL and batch-SQL when we backfill the historical data in > batch > > > mode. > > > >> > > > > >> > For the "Syntax" session, I think you could add descriptions of > how > > to > > > >> align backfill time travel with querying the latest data. And I > think > > > you > > > >> should also update the "Discussion thread" in the original FLIP. > > > >> > > > > >> > Moreover, I have a question about getting the table schema from > the > > > >> catalog. I'm not sure whether the Catalog#getTable(tablePath, > > timestamp) > > > >> will be called only once. If we have a backfill query between > > 2023-05-29 > > > >> and 2023-06-04 in the past week, and the table schema changed on > > > >> 2023-06-01, will the query below detect the schema changes during > > > backfill > > > >> the whole week? > > > >> > > > > >> > SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN > > > >> '2023-05-29 00:00:00' AND '2023-06-05 00:00:00' > > > >> > > > > >> > Best > > > >> > Yun Tang > > > >> > > > > >> > > > > >> > ________________________________ > > > >> > From: Shammon FY <zjur...@gmail.com> > > > >> > Sent: Thursday, June 1, 2023 17:57 > > > >> > To: dev@flink.apache.org <dev@flink.apache.org> > > > >> > Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > > > >> > > > > >> > Hi Feng, > > > >> > > > > >> > I have one minor comment about the public interface > `Optional<Long> > > > >> > getSnapshot()` in the `CatalogTable`. > > > >> > > > > >> > As we can get tables from the new method > > `Catalog.getTable(ObjectPath > > > >> > tablePath, long timestamp)`, I think the returned > `CatalogBaseTable` > > > >> will > > > >> > have the information of timestamp. Flink or connector such as > > > >> > iceberg/paimon can create sources from the `CatalogBaseTable` > > directly > > > >> > without the need to get the snapshot ID from > > > >> `CatalogTable.getSnapshot()`. > > > >> > What do you think of it? > > > >> > > > > >> > Best, > > > >> > Shammon FY > > > >> > > > > >> > > > > >> > On Thu, Jun 1, 2023 at 7:22 AM Jing Ge <j...@ververica.com.invalid > > > > > >> wrote: > > > >> > > > > >> >> Hi Feng, > > > >> >> > > > >> >> Thanks for the proposal! Very interesting feature. Would you like > > to > > > >> update > > > >> >> your thoughts described in your previous email about why > > > >> SupportsTimeTravel > > > >> >> has been rejected into the FLIP? This will help readers > understand > > > the > > > >> >> context (in the future). > > > >> >> > > > >> >> Since we always directly add overload methods into Catalog > > according > > > >> to new > > > >> >> requirements, which makes the interface bloated. Just out of > > > curiosity, > > > >> >> does it make sense to introduce some DSL design? Like > > > >> >> Catalog.getTable(tablePath).on(timeStamp), > > > >> >> Catalog.getTable(tablePath).current() for the most current > version, > > > and > > > >> >> more room for further extension like timestamp range, etc. I > > haven't > > > >> read > > > >> >> all the source code yet and I'm not sure if it is possible. But a > > > >> >> design like this will keep the Catalog API lean and the API/DSL > > will > > > be > > > >> >> self described and easier to use. > > > >> >> > > > >> >> Best regards, > > > >> >> Jing > > > >> >> > > > >> >> > > > >> >> On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski < > > > >> >> krzysiek.chmielew...@gmail.com> wrote: > > > >> >> > > > >> >>> Ok after second though I'm retracting my previous statement > about > > > >> Catalog > > > >> >>> changes you proposed. > > > >> >>> I do see a benefit for Delta connector actually with this change > > and > > > >> see > > > >> >>> why this could be coupled with Catalog. > > > >> >>> > > > >> >>> Delta Connector SQL support, also ships a Delta Catalog > > > implementation > > > >> >> for > > > >> >>> Flink. > > > >> >>> For Delta Catalog, table schema information is fetched from > > > underlying > > > >> >>> _delta_log and not stored in metastore. For time travel we > > actually > > > >> had a > > > >> >>> problem, that if we would like to timetravel back to some old > > > version, > > > >> >>> where schema was slightly different, then we would have a > conflict > > > >> since > > > >> >>> Catalog would return current schema and not how it was for > version > > > X. > > > >> >>> > > > >> >>> With your change, our Delta Catalog can actually fetch schema > for > > > >> >> version X > > > >> >>> and send it to DeltaTableFactory. Currency, Catalog can fetch > only > > > >> >> current > > > >> >>> version. What we would also need however is version > > > (number/timestamp) > > > >> >> for > > > >> >>> this table passed to DynamicTableFactory so we could properly > set > > > >> Delta > > > >> >>> standalone library. > > > >> >>> > > > >> >>> Regards, > > > >> >>> Krzysztof > > > >> >>> > > > >> >>> śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski < > > > >> >>> krzysiek.chmielew...@gmail.com> napisał(a): > > > >> >>> > > > >> >>>> Hi, > > > >> >>>> happy to see such a feature. > > > >> >>>> Small note from my end regarding Catalog changes. > > > >> >>>> > > > >> >>>> TL;DR > > > >> >>>> I don't think it is necessary to delegate this feature to the > > > >> catalog. > > > >> >> I > > > >> >>>> think that since "timetravel" is per job/query property, its > > should > > > >> not > > > >> >>> be > > > >> >>>> coupled with the Catalog or table definition. In my opinion > this > > is > > > >> >>>> something that DynamicTableFactory only has to know about. I > > would > > > >> >> rather > > > >> >>>> see this feature as it is - SQL syntax enhancement but delegate > > > >> clearly > > > >> >>> to > > > >> >>>> DynamicTableFactory. > > > >> >>>> > > > >> >>>> I've implemented timetravel feature for Delta Connector [1] > > using > > > >> >>>> current Flink API. > > > >> >>>> Docs are pending code review, but you can find them here [2] > and > > > >> >> examples > > > >> >>>> are available here [3] > > > >> >>>> > > > >> >>>> The timetravel feature that I've implemented is based on Flink > > > Query > > > >> >>>> hints. > > > >> >>>> "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */" > > > >> >>>> > > > >> >>>> The " versionAsOf" (we also have 'timestampAsOf') parameter is > > > >> handled > > > >> >>> not > > > >> >>>> by Catalog but by DyntamicTableFactory implementation for Delta > > > >> >>> connector. > > > >> >>>> The value of this property is passed to Delta standalone lib > API > > > that > > > >> >>>> returns table view for given version. > > > >> >>>> > > > >> >>>> I'm not sure how/if proposed change could benefit Delta > connector > > > >> >>>> implementation for this feature. > > > >> >>>> > > > >> >>>> Thanks, > > > >> >>>> Krzysztof > > > >> >>>> > > > >> >>>> [1] > > > >> >>>> > > > >> >>> > > > >> >> > > > >> > > > > > > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink > > > >> >>>> [2] > > > >> https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs > > > >> >>>> [3] > > > >> >>>> > > > >> >>> > > > >> >> > > > >> > > > > > > https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql > > > >> >>>> > > > >> >>>> śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com> > > napisał(a): > > > >> >>>> > > > >> >>>>> Hi, Feng > > > >> >>>>> > > > >> >>>>> Thanks for driving this FLIP, Time travel is very useful for > > Flink > > > >> >>>>> integrate with data lake system. I have one question why the > > > >> >>>>> implementation > > > >> >>>>> of TimeTravel is delegated to Catalog? Assuming that we use > > Flink > > > to > > > >> >>> query > > > >> >>>>> Hudi table with the time travel syntax, but we don't use the > > > >> >>> HudiCatalog, > > > >> >>>>> instead, we register the hudi table to InMemoryCatalog, can > we > > > >> >> support > > > >> >>>>> time travel for Hudi table in this case? > > > >> >>>>> In contrast, I think time travel should bind to connector > > instead > > > of > > > >> >>>>> Catalog, so the rejected alternative should be considered. > > > >> >>>>> > > > >> >>>>> Best, > > > >> >>>>> Ron > > > >> >>>>> > > > >> >>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道: > > > >> >>>>> > > > >> >>>>>> Hi, Feng. > > > >> >>>>>> Notice this FLIP only support batch mode for time travel. > > Would > > > it > > > >> >>> also > > > >> >>>>>> make sense to support stream mode to a read a snapshot of the > > > table > > > >> >>> as a > > > >> >>>>>> bounded stream? > > > >> >>>>>> > > > >> >>>>>> Best regards, > > > >> >>>>>> Yuxia > > > >> >>>>>> > > > >> >>>>>> ----- 原始邮件 ----- > > > >> >>>>>> 发件人: "Benchao Li" <libenc...@apache.org> > > > >> >>>>>> 收件人: "dev" <dev@flink.apache.org> > > > >> >>>>>> 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53 > > > >> >>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode > > > >> >>>>>> > > > >> >>>>>> # Can Calcite support this syntax ` VERSION AS OF` ? > > > >> >>>>>> > > > >> >>>>>> This also depends on whether this is defined in standard or > any > > > >> >> known > > > >> >>>>>> databases that have implemented this. If not, it would be > hard > > to > > > >> >> push > > > >> >>>>> it > > > >> >>>>>> to Calcite. > > > >> >>>>>> > > > >> >>>>>> # getTable(ObjectPath object, long timestamp) > > > >> >>>>>> > > > >> >>>>>> Then we again come to the problem of "casting between > timestamp > > > and > > > >> >>>>>> numeric", which has been disabled in FLINK-21978[1]. If > you're > > > >> gonna > > > >> >>> use > > > >> >>>>>> this, then we need to clarify that problem first. > > > >> >>>>>> > > > >> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-21978 > > > >> >>>>>> > > > >> >>>>>> > > > >> >>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道: > > > >> >>>>>> > > > >> >>>>>>> hi, thanks for your reply. > > > >> >>>>>>> > > > >> >>>>>>> @Benchao > > > >> >>>>>>>> did you consider the pushdown abilities compatible > > > >> >>>>>>> > > > >> >>>>>>> In the current design, the implementation of TimeTravel is > > > >> >> delegated > > > >> >>>>> to > > > >> >>>>>>> Catalog. We have added a function called getTable(ObjectPath > > > >> >>>>> tablePath, > > > >> >>>>>>> long timestamp) to obtain the corresponding CatalogBaseTable > > at > > > a > > > >> >>>>>> specific > > > >> >>>>>>> time. Therefore, I think it will not have any impact on the > > > >> >>> original > > > >> >>>>>>> pushdown abilities. > > > >> >>>>>>> > > > >> >>>>>>> > > > >> >>>>>>>> I see there is a rejected design for adding > > > >> >> SupportsTimeTravel, > > > >> >>>>> but > > > >> >>>>>> I > > > >> >>>>>>> didn't see the alternative in the FLIP doc > > > >> >>>>>>> > > > >> >>>>>>> Sorry, the document description is not very clear. > Regarding > > > >> >>> whether > > > >> >>>>> to > > > >> >>>>>>> support SupportTimeTravel, I have discussed it with yuxia. > > Since > > > >> >> we > > > >> >>>>> have > > > >> >>>>>>> already passed the corresponding time in > getTable(ObjectPath, > > > long > > > >> >>>>>>> timestamp) of Catalog, SupportTimeTravel may not be > necessary. > > > >> >>>>>>> > > > >> >>>>>>> In getTable(ObjectPath object, long timestamp), we can > obtain > > > the > > > >> >>>>> schema > > > >> >>>>>> of > > > >> >>>>>>> the corresponding time point and put the SNAPSHOT that needs > > to > > > be > > > >> >>>>>> consumed > > > >> >>>>>>> into options. > > > >> >>>>>>> > > > >> >>>>>>> > > > >> >>>>>>> @Shammon > > > >> >>>>>>>> Could we support this in Flink too? > > > >> >>>>>>> > > > >> >>>>>>> I personally think it's possible, but limited by Calcite's > > > syntax > > > >> >>>>>>> restrictions. I believe we should first support this syntax > in > > > >> >>>>> Calcite. > > > >> >>>>>>> Currently, I think it may not be easy to support this > syntax > > in > > > >> >>>>> Flink's > > > >> >>>>>>> parser. @Benchao, what do you think? Can Calcite support > this > > > >> >> syntax > > > >> >>>>>>> ` VERSION AS OF` ? > > > >> >>>>>>> > > > >> >>>>>>> > > > >> >>>>>>> Best, > > > >> >>>>>>> Feng. > > > >> >>>>>>> > > > >> >>>>>>> > > > >> >>>>>>> On Fri, May 26, 2023 at 2:55 PM Shammon FY < > zjur...@gmail.com > > > > > > >> >>> wrote: > > > >> >>>>>>> > > > >> >>>>>>>> Thanks Feng, the feature of time travel sounds great! > > > >> >>>>>>>> > > > >> >>>>>>>> In addition to SYSTEM_TIME, lake houses such as paimon and > > > >> >> iceberg > > > >> >>>>>>> support > > > >> >>>>>>>> snapshot or version. For example, users can query snapshot > 1 > > > for > > > >> >>>>> paimon > > > >> >>>>>>> by > > > >> >>>>>>>> the following statement > > > >> >>>>>>>> SELECT * FROM t VERSION AS OF 1 > > > >> >>>>>>>> > > > >> >>>>>>>> Could we support this in Flink too? > > > >> >>>>>>>> > > > >> >>>>>>>> Best, > > > >> >>>>>>>> Shammon FY > > > >> >>>>>>>> > > > >> >>>>>>>> On Fri, May 26, 2023 at 1:20 PM Benchao Li < > > > >> >> libenc...@apache.org> > > > >> >>>>>> wrote: > > > >> >>>>>>>> > > > >> >>>>>>>>> Regarding the implementation, did you consider the > pushdown > > > >> >>>>> abilities > > > >> >>>>>>>>> compatible, e.g., projection pushdown, filter pushdown, > > > >> >>> partition > > > >> >>>>>>>> pushdown. > > > >> >>>>>>>>> Since `Snapshot` is not handled much in existing rules, I > > > >> >> have a > > > >> >>>>>>> concern > > > >> >>>>>>>>> about this. Of course, it depends on your implementation > > > >> >> detail, > > > >> >>>>> what > > > >> >>>>>>> is > > > >> >>>>>>>>> important is that we'd better add some cross tests for > > these. > > > >> >>>>>>>>> > > > >> >>>>>>>>> Regarding the interface exposed to Connector, I see there > > is a > > > >> >>>>>> rejected > > > >> >>>>>>>>> design for adding SupportsTimeTravel, but I didn't see the > > > >> >>>>>> alternative > > > >> >>>>>>> in > > > >> >>>>>>>>> the FLIP doc. IMO, this is an important thing we need to > > > >> >> clarify > > > >> >>>>>>> because > > > >> >>>>>>>> we > > > >> >>>>>>>>> need to know whether the Connector supports this, and what > > > >> >>>>>>>> column/metadata > > > >> >>>>>>>>> corresponds to 'system_time'. > > > >> >>>>>>>>> > > > >> >>>>>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道: > > > >> >>>>>>>>> > > > >> >>>>>>>>>> Thanks for your reply > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> @Timo @BenChao @yuxia > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> Sorry for the mistake, Currently , calcite only supports > > > >> >>> `FOR > > > >> >>>>>>>>> SYSTEM_TIME > > > >> >>>>>>>>>> AS OF ` syntax. We can only support `FOR SYSTEM_TIME AS > > > >> >> OF` > > > >> >>> . > > > >> >>>>>> I've > > > >> >>>>>>>>>> updated the syntax part of the FLIP. > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> @Timo > > > >> >>>>>>>>>> > > > >> >>>>>>>>>>> We will convert it to TIMESTAMP_LTZ? > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> Yes, I think we need to convert TIMESTAMP to > TIMESTAMP_LTZ > > > >> >> and > > > >> >>>>> then > > > >> >>>>>>>>> convert > > > >> >>>>>>>>>> it into a long value. > > > >> >>>>>>>>>> > > > >> >>>>>>>>>>> How do we want to query the most recent version of a > table > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does > > > >> >>> cause > > > >> >>>>>>>>>> inconsistency with the real-time concept. > > > >> >>>>>>>>>> However, from my personal understanding, the scope of > `AS > > > >> >> OF > > > >> >>>>>>>>>> CURRENT_TIMESTAMP` is the table itself, not the table > > > >> >> record. > > > >> >>>>> So, > > > >> >>>>>> I > > > >> >>>>>>>>> think > > > >> >>>>>>>>>> using CURRENT_TIMESTAMP should also be reasonable?. > > > >> >>>>>>>>>> Additionally, if no version is specified, the latest > > version > > > >> >>>>> should > > > >> >>>>>>> be > > > >> >>>>>>>>> used > > > >> >>>>>>>>>> by default. > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> Best, > > > >> >>>>>>>>>> Feng > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> > > > >> >>>>>>>>>> On Thu, May 25, 2023 at 7:47 PM yuxia < > > > >> >>>>> luoyu...@alumni.sjtu.edu.cn > > > >> >>>>>>> > > > >> >>>>>>>>> wrote: > > > >> >>>>>>>>>> > > > >> >>>>>>>>>>> Thanks Feng for bringing this up. It'll be great to > > > >> >>> introduce > > > >> >>>>>> time > > > >> >>>>>>>>> travel > > > >> >>>>>>>>>>> to Flink to have a better integration with external data > > > >> >>>>> soruces. > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> I also share same concern about the syntax. > > > >> >>>>>>>>>>> I see in the part of `Whether to support other syntax > > > >> >>>>>>>> implementations` > > > >> >>>>>>>>> in > > > >> >>>>>>>>>>> this FLIP, seems the syntax in Calcite should be `FOR > > > >> >>>>> SYSTEM_TIME > > > >> >>>>>>> AS > > > >> >>>>>>>>> OF`, > > > >> >>>>>>>>>>> right? > > > >> >>>>>>>>>>> But the the syntax part in this FLIP, it seems to be `AS > > > >> >> OF > > > >> >>>>>>>> TIMESTAMP` > > > >> >>>>>>>>>>> instead of `FOR SYSTEM_TIME AS OF`. Is it just a > mistake > > > >> >> or > > > >> >>>>> by > > > >> >>>>>>>> design? > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> Best regards, > > > >> >>>>>>>>>>> Yuxia > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> ----- 原始邮件 ----- > > > >> >>>>>>>>>>> 发件人: "Benchao Li" <libenc...@apache.org> > > > >> >>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org> > > > >> >>>>>>>>>>> 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17 > > > >> >>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch > > > >> >>> Mode > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> Thanks Feng, it's exciting to have this ability. > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> Regarding the syntax section, are you proposing `AS OF` > > > >> >>>>> instead > > > >> >>>>>> of > > > >> >>>>>>>> `FOR > > > >> >>>>>>>>>>> SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is > in > > > >> >>> the > > > >> >>>>> SQL > > > >> >>>>>>>>>> standard > > > >> >>>>>>>>>>> and has been supported in some database vendors such as > > > >> >> SQL > > > >> >>>>>> Server. > > > >> >>>>>>>>> About > > > >> >>>>>>>>>>> `AS OF`, is it in the standard or any database vendor > > > >> >>> supports > > > >> >>>>>>> this, > > > >> >>>>>>>> if > > > >> >>>>>>>>>>> yes, I think it's worth to add this support to Calcite, > > > >> >> and > > > >> >>> I > > > >> >>>>>> would > > > >> >>>>>>>>> give > > > >> >>>>>>>>>> a > > > >> >>>>>>>>>>> hand in Calcite side. Otherwise, I think we'd better to > > > >> >> use > > > >> >>>>> `FOR > > > >> >>>>>>>> SYSTEM > > > >> >>>>>>>>>> AS > > > >> >>>>>>>>>>> OF`. > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> Timo Walther <twal...@apache.org> 于2023年5月25日周四 > 19:02写道: > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>>> Also: How do we want to query the most recent version > > > >> >> of a > > > >> >>>>>> table? > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> `AS OF CURRENT_TIMESTAMP` would be ideal, but according > > > >> >> to > > > >> >>>>> the > > > >> >>>>>>> docs > > > >> >>>>>>>>>> both > > > >> >>>>>>>>>>>> the type is TIMESTAMP_LTZ and what is even more > > > >> >> concerning > > > >> >>>>> is > > > >> >>>>>> the > > > >> >>>>>>>> it > > > >> >>>>>>>>>>>> actually is evalated row-based: > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>>> Returns the current SQL timestamp in the local time > > > >> >>> zone, > > > >> >>>>>> the > > > >> >>>>>>>>> return > > > >> >>>>>>>>>>>> type is TIMESTAMP_LTZ(3). It is evaluated for each > > > >> >> record > > > >> >>> in > > > >> >>>>>>>>> streaming > > > >> >>>>>>>>>>>> mode. But in batch mode, it is evaluated once as the > > > >> >> query > > > >> >>>>>> starts > > > >> >>>>>>>> and > > > >> >>>>>>>>>>>> uses the same result for every row. > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> This could make it difficult to explain in a join > > > >> >> scenario > > > >> >>>>> of > > > >> >>>>>>>>> multiple > > > >> >>>>>>>>>>>> snapshotted tables. > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> Regards, > > > >> >>>>>>>>>>>> Timo > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> On 25.05.23 12:29, Timo Walther wrote: > > > >> >>>>>>>>>>>>> Hi Feng, > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> thanks for proposing this FLIP. It makes a lot of > > > >> >> sense > > > >> >>> to > > > >> >>>>>>>> finally > > > >> >>>>>>>>>>>>> support querying tables at a specific point in time or > > > >> >>>>>>> hopefully > > > >> >>>>>>>>> also > > > >> >>>>>>>>>>>>> ranges soon. Following time-versioned tables. > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> Here is some feedback from my side: > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> 1. Syntax > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> Can you elaborate a bit on the Calcite restrictions? > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> Does Calcite currently support `AS OF` syntax for this > > > >> >>> but > > > >> >>>>>> not > > > >> >>>>>>>> `FOR > > > >> >>>>>>>>>>>>> SYSTEM_TIME AS OF`? > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> It would be great to support `AS OF` also for > > > >> >>>>> time-versioned > > > >> >>>>>>>> joins > > > >> >>>>>>>>>> and > > > >> >>>>>>>>>>>>> have a unified and short syntax. > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> Once a fix is merged in Calcite for this, we can make > > > >> >>> this > > > >> >>>>>>>>> available > > > >> >>>>>>>>>> in > > > >> >>>>>>>>>>>>> Flink earlier by copying the corresponding classes > > > >> >> until > > > >> >>>>> the > > > >> >>>>>>> next > > > >> >>>>>>>>>>>>> Calcite upgrade is performed. > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> 2. Semantics > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> How do we interpret the timestamp? In Flink we have 2 > > > >> >>>>>> timestamp > > > >> >>>>>>>>> types > > > >> >>>>>>>>>>>>> (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF > > > >> >>>>>> TIMESTAMP > > > >> >>>>>>>>>>>>> '2023-04-27 00:00:00', in which timezone will the > > > >> >>>>> timestamp > > > >> >>>>>> be? > > > >> >>>>>>>> We > > > >> >>>>>>>>>> will > > > >> >>>>>>>>>>>>> convert it to TIMESTAMP_LTZ? > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> We definely need to clarify this because the past has > > > >> >>>>> shown > > > >> >>>>>>> that > > > >> >>>>>>>>>>>>> daylight saving times make our lives hard. > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> Thanks, > > > >> >>>>>>>>>>>>> Timo > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> On 25.05.23 10:57, Feng Jin wrote: > > > >> >>>>>>>>>>>>>> Hi, everyone. > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> I’d like to start a discussion about FLIP-308: > > > >> >> Support > > > >> >>>>> Time > > > >> >>>>>>>> Travel > > > >> >>>>>>>>>> In > > > >> >>>>>>>>>>>>>> Batch > > > >> >>>>>>>>>>>>>> Mode [1] > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> Time travel is a SQL syntax used to query historical > > > >> >>>>>> versions > > > >> >>>>>>> of > > > >> >>>>>>>>>> data. > > > >> >>>>>>>>>>>> It > > > >> >>>>>>>>>>>>>> allows users to specify a point in time and retrieve > > > >> >>> the > > > >> >>>>>> data > > > >> >>>>>>>> and > > > >> >>>>>>>>>>>>>> schema of > > > >> >>>>>>>>>>>>>> a table as it appeared at that time. With time > > > >> >> travel, > > > >> >>>>> users > > > >> >>>>>>> can > > > >> >>>>>>>>>>> easily > > > >> >>>>>>>>>>>>>> analyze and compare historical versions of data. > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> With the widespread use of data lake systems such as > > > >> >>>>> Paimon, > > > >> >>>>>>>>>> Iceberg, > > > >> >>>>>>>>>>>> and > > > >> >>>>>>>>>>>>>> Hudi, time travel can provide more convenience for > > > >> >>> users' > > > >> >>>>>> data > > > >> >>>>>>>>>>> analysis. > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> Looking forward to your opinions, any suggestions are > > > >> >>>>>>> welcomed. > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> 1. > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>> > > > >> >>>>>>>>> > > > >> >>>>>>>> > > > >> >>>>>>> > > > >> >>>>>> > > > >> >>>>> > > > >> >>> > > > >> >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> Best. > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> Feng > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> -- > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>>> Best, > > > >> >>>>>>>>>>> Benchao Li > > > >> >>>>>>>>>>> > > > >> >>>>>>>>>> > > > >> >>>>>>>>> > > > >> >>>>>>>>> > > > >> >>>>>>>>> -- > > > >> >>>>>>>>> > > > >> >>>>>>>>> Best, > > > >> >>>>>>>>> Benchao Li > > > >> >>>>>>>>> > > > >> >>>>>>>> > > > >> >>>>>>> > > > >> >>>>>> > > > >> >>>>>> > > > >> >>>>>> -- > > > >> >>>>>> > > > >> >>>>>> Best, > > > >> >>>>>> Benchao Li > > > >> >>>>>> > > > >> >>>>> > > > >> >>>> > > > >> >>> > > > >> >> > > > >> > > > >> > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li