As you can see that you must use `UNIX_TIMESTAMP` to do this work, that's
where the time zone happens.

What I'm talking about is casting timestamp/timestamp_ltz to long directly,
that's why the semantic is tricky when you are casting timestamp to long
using time zone.

For other systems, such as SQL server[1], they actually uses a string
instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01
00:00:00.0000000'`, I'm not sure whether they convert the string implicitly
to TIMESTAMP_LTZ, or they just have a different definition of the syntax.

But for us, we are definitely using timestamp/timestmap_ltz literal here,
that's why it is special, and we must highlight this behavior that we are
converting a timestamp without time zone literal to long using the session
time zone.

[1]
https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16

Feng Jin <jinfeng1...@gmail.com> 于2023年6月8日周四 11:35写道:

> Hi all,
>
> thanks for your input
>
>
> @Benchao
>
> >  The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
> WITHOUT TIME ZONE", converting it to unix timestamp would use UTC timezone,
> which is not usually expected by users.
>
> It was indeed the case before Flink 1.13, but now my understanding is that
> there have been some slight changes in the definition of TIMESTAMP.
>
> TIMESTAMP is currently used to specify the year, month, day, hour, minute
> and second. We recommend that users use *UNIX_TIMESTAMP(CAST(timestamp_col
> AS STRING))* to convert *TIMESTAMP values* and *long values*. The
> *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore,
> whether converting TIMESTAMP or TIMESTAMP_LTZ to Long values will involve
> using the *LOCAL TIME ZONE*.
>
>
> Here is an test:
>
> Flink SQL> SET 'table.local-time-zone' = 'UTC';
> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as
> STRING)) as `timestamp`;
> ---------------
>  timestamp
>  --------------
>  0
>
> Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as
> STRING)) as `timestamp`;
> ---------------
>  timestamp
>  --------------
>  -28800
>
> Therefore, the current conversion method exposed to users is also using
> LOCAL TIME ZONE.
>
>
> @yuxia
>
> Thank you very much for providing the list of behaviors of TIMESTAMP in
> other systems.
>
> > I think we can align them to avoid the inconsistency to other engines and
> provide convenience for the external connectors while integrating Flink's
> time travel API.
>
> +1 for this.
>
> > Regarding the inconsistency, I think we can consider time-travel as a
> specical case, and we do needs to highlight this in this FLIP.
> As for "violate the restriction outlined in FLINK-21978[1]", since we cast
> timestamp to epochMillis only for the internal use, and won't expose it to
> users, I don't think it will violate the restriction.
> Btw, please add a brief desc to explain the meaning of the parameter
> `timestamp` in method `CatalogBaseTable getTable(ObjectPath tablePath, long
> timestamp)`. Maybe something like "timestamp of the table snapt, which is
> millseconds since 1970-01-01 00:00:00 UTC".
>
> Thank you for the suggestions regarding the document. I will add them to
> FLIP.
>
>
> Best,
> Feng
>
>
> On Wed, Jun 7, 2023 at 12:18 PM Benchao Li <libenc...@apache.org> wrote:
>
> > I also share the concern about the timezone problem.
> >
> > The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
> WITHOUT
> > TIME ZONE", converting it to unix timestamp would use UTC timezone, which
> > is not usually expected by users.
> >
> > If we want to keep consistent with the standard, we probably should use
> > "TIMESTAMP WITH LOCAL ZONE '2023-04-27 00:00:00'", which type is
> "TIMESTAMP
> > WITH LOCAL TIME ZONE", and converting it to unix timestamp will consider
> > the session timezone, which is the expected result. But it's inconvenient
> > for users.
> >
> > Taking this a special case, and converting "TIMESTAMP '2023-04-27
> > 00:00:00'" to a unix timestamp with session timezone, will be convenient
> > for users, but will break the standard. I will +0.5 for this choice.
> >
> > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年6月7日周三 12:06写道:
> >
> > > Hi, Feng Jin.
> > > I think the concern of Leonard may be the inconsistency of the behavior
> > of
> > > TIMESTAMP '2023-04-27 00:00:00' beween timetravel and other sql
> > statement.
> > >
> > > For the normal sql:
> > > `SELECT TIMESTAMP '2023-04-27 00:00:00'`, we won't consider timezone.
> > > But for the sql for timetravl:
> > > `SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP '2023-04-27
> > > 00:00:00'`, we will consider the timezone and convert to UTC timestamp.
> > >
> > > The concern is valid. But for time travel, most style of engines,
> > > Spark[1], Hive[2], Trino[3] also do the time conversion with
> considering
> > > the seesion time zone. I think we can align them to avoid the
> > inconsistency
> > > to other engines and provide convenience for the external connectors
> > while
> > > integrating Flink's time travel API.
> > >
> > > Regarding the inconsistency, I think we can consider time-travel as a
> > > specical case, and we do needs to highlight this in this FLIP.
> > > As for "violate the restriction outlined in FLINK-21978[1]", since we
> > cast
> > > timestamp to epochMillis only for the internal use, and won't expose it
> > to
> > > users, I don't think it will violate the restriction.
> > > Btw, please add a brief desc to explain the meaning of the parameter
> > > `timestamp` in method `CatalogBaseTable getTable(ObjectPath tablePath,
> > long
> > > timestamp)`. Maybe something like "timestamp of the table snapt, which
> is
> > > millseconds since 1970-01-01 00:00:00 UTC".
> > >
> > > [1]
> > >
> >
> https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56
> > > [2]
> > >
> >
> https://github.com/apache/hive/blob/f5e69dc38d7ff26c70be19adc9d1a3ae90dc4cf2/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java#L989
> > > [3]
> > >
> >
> https://github.com/trinodb/trino/blob/2433d9e60f1abb0d85c32374c1758525560e1a86/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java#L443
> > >
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > ----- 原始邮件 -----
> > > 发件人: "Feng Jin" <jinfeng1...@gmail.com>
> > > 收件人: "dev" <dev@flink.apache.org>
> > > 发送时间: 星期二, 2023年 6 月 06日 下午 10:15:47
> > > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
> > >
> > > Hi everyone
> > >
> > > Thanks everyone for your input.
> > >
> > >
> > > @Yun
> > >
> > > >  I think you could add descriptions of how to align backfill time
> > travel
> > > with querying the latest data. And I think you should also update the
> > > "Discussion thread" in the original FLIP.
> > >
> > > Thank you for the suggestion, I will update it in the document.
> > >
> > > >  I have a question about getting the table schema from the catalog.
> I'm
> > > not sure whether the Catalog#getTable(tablePath, timestamp) will be
> > called
> > > only once.
> > >
> > > I understand that in a query, the schema of the table is determined
> > before
> > > execution. The schema used will be based on the latest schema within
> the
> > > TimeTravel period.
> > >
> > > In addition, due to current syntax limitations, we are unable to
> support
> > > the use of BETWEEN AND.
> > >
> > >
> > > @Jing
> > >
> > > >  Would you like to update your thoughts described in your previous
> > email
> > > about why SupportsTimeTravel has been rejected into the FLIP?
> > >
> > > Sure,  I updated the doc.
> > >
> > >
> > > >    Since we always directly add overload methods into Catalog
> according
> > > to new requirements, which makes the interface bloated
> > >
> > > Your concern is valid. If we need to support the long type version in
> the
> > > future, we may have to add another method "getTable(ObjectPath, long
> > > version)". However, I understand that
> > > "Catalog.getTable(tablePath).on(timeStamp)" may not meet the
> > requirements.
> > > The timestamp is for Catalog's use, and Catalog obtains the
> corresponding
> > > schema based on this time.
> > >
> > >
> > > @liu @Regards
> > >
> > > I am very sorry for the unclear description in the document. I have
> > updated
> > > relevant descriptions regarding why it needs to be implemented in
> > Catalog.
> > >
> > > Travel not only requires obtaining data at the corresponding time
> point,
> > > but also requires the corresponding Schema at that time point
> > >
> > >
> > > @Shammon
> > >
> > > > Flink or connector such as  iceberg/paimon can create sources from
> the
> > > `CatalogBaseTable` directly without the need to get the snapshot ID
> from
> > > `CatalogTable.getSnapshot()`.  What do you think of it?
> > >
> > > You are right, we don't need the getSnapshot interface for
> PaimonCatalog
> > or
> > > IcebergCatalog tables, but we may need it for temporary tables.
> > >
> > >
> > >
> > > Best,
> > > Feng
> > >
> > >
> > > On Tue, Jun 6, 2023 at 9:32 PM Feng Jin <jinfeng1...@gmail.com> wrote:
> > >
> > > > Sorry I replied to the wrong mail. Please ignore the last email.
> > > >
> > > >
> > > > Hi Leonard
> > > >
> > > > > 1. Unification SQL
> > > >
> > > > I agree that it is crucial for us to support both batch and streaming
> > > > processing.  The current design allows for the support of both batch
> > and
> > > > streaming processing. I'll update the FLIP later.
> > > >
> > > >
> > > > >  2.Semantics
> > > >
> > > > In my opinion, it would be feasible to perform the conversion based
> on
> > > the
> > > > current session time, regardless of whether it is TIMESTAMP or
> > > > TIMESTAMP_LTZ.
> > > >
> > > > However, this may indeed violate the restriction outlined in
> > > > FLINK-21978[1]  as Benchao mentioned, and I am uncertain as to
> whether
> > it
> > > > is reasonable.
> > > >
> > > >
> > > > >   3.  Some external systems may use timestamp value to mark a
> > version,
> > > > but others may use version number、file position、log offset.
> > > >
> > > > It is true that most systems support time-related operations, and I
> > > > believe that the current design is compatible with most systems.
> > However,
> > > > if we want to support long data type, it may require Calcite to
> support
> > > the
> > > > VERSION AS OF syntax. I understand that this is something that we may
> > > need
> > > > to consider in the future.
> > > >
> > > >
> > > > Best,
> > > > Feng
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-21978
> > > >
> > > > On Tue, Jun 6, 2023 at 8:28 PM Leonard Xu <xbjt...@gmail.com> wrote:
> > > >
> > > >> Hi, Feng
> > > >>
> > > >> Thanks for driving this FLIP, very impressive feature that users
> want,
> > > >> I’ve some quick questions here.
> > > >>
> > > >> 1.Unification SQL:
> > > >>         The snapshot  concept exists both in Batch mode and
> Streaming
> > > >> mode,  could we consider a unified proposal? I think users won’t
> > another
> > > >> SQL syntax named
> > > >> Time travel for Streaming mode.
> > > >>
> > > >> 2.Semantics:
> > > >>         Flink supports TIMESTAMP and TIMESTAMP_LTZ types, to get a
> > long
> > > >> timestamp value (getTable(ObjectPath tablePath, long timestamp)) we
> > need
> > > >> two information i.e. a TIMESTAMP value and current session timezone,
> > > how
> > > >> we deal the value with current proposed SQL syntax.
> > > >>
> > > >> 3. Is it enough using sinlge timestamp to track a snapshot(version)
> of
> > > >> external table?   Some external systems may use timestamp value to
> > mark
> > > a
> > > >> version, but others may use version number、file position、log offset.
> > > >>
> > > >> Best,
> > > >> Leonard
> > > >>
> > > >>
> > > >>
> > > >> > On Jun 5, 2023, at 3:28 PM, Yun Tang <myas...@live.com> wrote:
> > > >> >
> > > >> > Hi Feng,
> > > >> >
> > > >> > I think this FLIP would provide one important feature to unify the
> > > >> stream-SQL and batch-SQL when we backfill the historical data in
> batch
> > > mode.
> > > >> >
> > > >> > For the "Syntax" session, I think you could add descriptions of
> how
> > to
> > > >> align backfill time travel with querying the latest data. And I
> think
> > > you
> > > >> should also update the "Discussion thread" in the original FLIP.
> > > >> >
> > > >> > Moreover, I have a question about getting the table schema from
> the
> > > >> catalog. I'm not sure whether the Catalog#getTable(tablePath,
> > timestamp)
> > > >> will be called only once. If we have a backfill query between
> > 2023-05-29
> > > >> and 2023-06-04 in the past week, and the table schema changed on
> > > >> 2023-06-01, will the query below detect the schema changes during
> > > backfill
> > > >> the whole week?
> > > >> >
> > > >> > SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN
> > > >> '2023-05-29 00:00:00' AND '2023-06-05 00:00:00'
> > > >> >
> > > >> > Best
> > > >> > Yun Tang
> > > >> >
> > > >> >
> > > >> > ________________________________
> > > >> > From: Shammon FY <zjur...@gmail.com>
> > > >> > Sent: Thursday, June 1, 2023 17:57
> > > >> > To: dev@flink.apache.org <dev@flink.apache.org>
> > > >> > Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
> > > >> >
> > > >> > Hi Feng,
> > > >> >
> > > >> > I have one minor comment about the public interface
> `Optional<Long>
> > > >> > getSnapshot()` in the `CatalogTable`.
> > > >> >
> > > >> > As we can get tables from the new method
> > `Catalog.getTable(ObjectPath
> > > >> > tablePath, long timestamp)`, I think the returned
> `CatalogBaseTable`
> > > >> will
> > > >> > have the information of timestamp. Flink or connector such as
> > > >> > iceberg/paimon can create sources from the `CatalogBaseTable`
> > directly
> > > >> > without the need to get the snapshot ID from
> > > >> `CatalogTable.getSnapshot()`.
> > > >> > What do you think of it?
> > > >> >
> > > >> > Best,
> > > >> > Shammon FY
> > > >> >
> > > >> >
> > > >> > On Thu, Jun 1, 2023 at 7:22 AM Jing Ge <j...@ververica.com.invalid
> >
> > > >> wrote:
> > > >> >
> > > >> >> Hi Feng,
> > > >> >>
> > > >> >> Thanks for the proposal! Very interesting feature. Would you like
> > to
> > > >> update
> > > >> >> your thoughts described in your previous email about why
> > > >> SupportsTimeTravel
> > > >> >> has been rejected into the FLIP? This will help readers
> understand
> > > the
> > > >> >> context (in the future).
> > > >> >>
> > > >> >> Since we always directly add overload methods into Catalog
> > according
> > > >> to new
> > > >> >> requirements, which makes the interface bloated. Just out of
> > > curiosity,
> > > >> >> does it make sense to introduce some DSL design? Like
> > > >> >> Catalog.getTable(tablePath).on(timeStamp),
> > > >> >> Catalog.getTable(tablePath).current() for the most current
> version,
> > > and
> > > >> >> more room for further extension like timestamp range, etc. I
> > haven't
> > > >> read
> > > >> >> all the source code yet and I'm not sure if it is possible. But a
> > > >> >> design like this will keep the Catalog API lean and the API/DSL
> > will
> > > be
> > > >> >> self described and easier to use.
> > > >> >>
> > > >> >> Best regards,
> > > >> >> Jing
> > > >> >>
> > > >> >>
> > > >> >> On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski <
> > > >> >> krzysiek.chmielew...@gmail.com> wrote:
> > > >> >>
> > > >> >>> Ok after second though I'm retracting my previous statement
> about
> > > >> Catalog
> > > >> >>> changes you proposed.
> > > >> >>> I do see a benefit for Delta connector actually with this change
> > and
> > > >> see
> > > >> >>> why this could be coupled with Catalog.
> > > >> >>>
> > > >> >>> Delta Connector SQL support, also ships a Delta Catalog
> > > implementation
> > > >> >> for
> > > >> >>> Flink.
> > > >> >>> For Delta Catalog, table schema information is fetched from
> > > underlying
> > > >> >>> _delta_log and not stored in metastore. For time travel we
> > actually
> > > >> had a
> > > >> >>> problem, that if we would like to timetravel back to some old
> > > version,
> > > >> >>> where schema was slightly different, then we would have a
> conflict
> > > >> since
> > > >> >>> Catalog would return current schema and not how it was for
> version
> > > X.
> > > >> >>>
> > > >> >>> With your change, our Delta Catalog can actually fetch schema
> for
> > > >> >> version X
> > > >> >>> and send it to DeltaTableFactory. Currency, Catalog can fetch
> only
> > > >> >> current
> > > >> >>> version. What we would also need however is version
> > > (number/timestamp)
> > > >> >> for
> > > >> >>> this table passed to DynamicTableFactory so we could properly
> set
> > > >> Delta
> > > >> >>> standalone library.
> > > >> >>>
> > > >> >>> Regards,
> > > >> >>> Krzysztof
> > > >> >>>
> > > >> >>> śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski <
> > > >> >>> krzysiek.chmielew...@gmail.com> napisał(a):
> > > >> >>>
> > > >> >>>> Hi,
> > > >> >>>> happy to see such a feature.
> > > >> >>>> Small note from my end regarding Catalog changes.
> > > >> >>>>
> > > >> >>>> TL;DR
> > > >> >>>> I don't think it is necessary to delegate this feature to the
> > > >> catalog.
> > > >> >> I
> > > >> >>>> think that since "timetravel" is per job/query property, its
> > should
> > > >> not
> > > >> >>> be
> > > >> >>>> coupled with the Catalog or table definition. In my opinion
> this
> > is
> > > >> >>>> something that DynamicTableFactory only has to know about. I
> > would
> > > >> >> rather
> > > >> >>>> see this feature as it is - SQL syntax enhancement but delegate
> > > >> clearly
> > > >> >>> to
> > > >> >>>> DynamicTableFactory.
> > > >> >>>>
> > > >> >>>> I've implemented timetravel feature for Delta Connector  [1]
> > using
> > > >> >>>> current Flink API.
> > > >> >>>> Docs are pending code review, but you can find them here [2]
> and
> > > >> >> examples
> > > >> >>>> are available here [3]
> > > >> >>>>
> > > >> >>>> The timetravel feature that I've implemented is based on Flink
> > > Query
> > > >> >>>> hints.
> > > >> >>>> "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */"
> > > >> >>>>
> > > >> >>>> The " versionAsOf" (we also have 'timestampAsOf') parameter is
> > > >> handled
> > > >> >>> not
> > > >> >>>> by Catalog but by DyntamicTableFactory implementation for Delta
> > > >> >>> connector.
> > > >> >>>> The value of this property is passed to Delta standalone lib
> API
> > > that
> > > >> >>>> returns table view for given version.
> > > >> >>>>
> > > >> >>>> I'm not sure how/if proposed change could benefit Delta
> connector
> > > >> >>>> implementation for this feature.
> > > >> >>>>
> > > >> >>>> Thanks,
> > > >> >>>> Krzysztof
> > > >> >>>>
> > > >> >>>> [1]
> > > >> >>>>
> > > >> >>>
> > > >> >>
> > > >>
> > >
> >
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink
> > > >> >>>> [2]
> > > >> https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs
> > > >> >>>> [3]
> > > >> >>>>
> > > >> >>>
> > > >> >>
> > > >>
> > >
> >
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql
> > > >> >>>>
> > > >> >>>> śr., 31 maj 2023 o 06:03 liu ron <ron9....@gmail.com>
> > napisał(a):
> > > >> >>>>
> > > >> >>>>> Hi, Feng
> > > >> >>>>>
> > > >> >>>>> Thanks for driving this FLIP, Time travel is very useful for
> > Flink
> > > >> >>>>> integrate with data lake system. I have one question why the
> > > >> >>>>> implementation
> > > >> >>>>> of TimeTravel is delegated to Catalog? Assuming that we use
> > Flink
> > > to
> > > >> >>> query
> > > >> >>>>> Hudi table with the time travel syntax, but we don't use the
> > > >> >>> HudiCatalog,
> > > >> >>>>> instead, we register the hudi table to InMemoryCatalog,  can
> we
> > > >> >> support
> > > >> >>>>> time travel for Hudi table in this case?
> > > >> >>>>> In contrast, I think time travel should bind to connector
> > instead
> > > of
> > > >> >>>>> Catalog, so the rejected alternative should be considered.
> > > >> >>>>>
> > > >> >>>>> Best,
> > > >> >>>>> Ron
> > > >> >>>>>
> > > >> >>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年5月30日周二 09:40写道:
> > > >> >>>>>
> > > >> >>>>>> Hi, Feng.
> > > >> >>>>>> Notice this FLIP only support batch mode for time travel.
> > Would
> > > it
> > > >> >>> also
> > > >> >>>>>> make sense to support stream mode to a read a snapshot of the
> > > table
> > > >> >>> as a
> > > >> >>>>>> bounded stream?
> > > >> >>>>>>
> > > >> >>>>>> Best regards,
> > > >> >>>>>> Yuxia
> > > >> >>>>>>
> > > >> >>>>>> ----- 原始邮件 -----
> > > >> >>>>>> 发件人: "Benchao Li" <libenc...@apache.org>
> > > >> >>>>>> 收件人: "dev" <dev@flink.apache.org>
> > > >> >>>>>> 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53
> > > >> >>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
> > > >> >>>>>>
> > > >> >>>>>> # Can Calcite support this syntax ` VERSION AS OF`  ?
> > > >> >>>>>>
> > > >> >>>>>> This also depends on whether this is defined in standard or
> any
> > > >> >> known
> > > >> >>>>>> databases that have implemented this. If not, it would be
> hard
> > to
> > > >> >> push
> > > >> >>>>> it
> > > >> >>>>>> to Calcite.
> > > >> >>>>>>
> > > >> >>>>>> # getTable(ObjectPath object, long timestamp)
> > > >> >>>>>>
> > > >> >>>>>> Then we again come to the problem of "casting between
> timestamp
> > > and
> > > >> >>>>>> numeric", which has been disabled in FLINK-21978[1]. If
> you're
> > > >> gonna
> > > >> >>> use
> > > >> >>>>>> this, then we need to clarify that problem first.
> > > >> >>>>>>
> > > >> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-21978
> > > >> >>>>>>
> > > >> >>>>>>
> > > >> >>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月29日周一 15:57写道:
> > > >> >>>>>>
> > > >> >>>>>>> hi, thanks for your reply.
> > > >> >>>>>>>
> > > >> >>>>>>> @Benchao
> > > >> >>>>>>>> did you consider the pushdown abilities compatible
> > > >> >>>>>>>
> > > >> >>>>>>> In the current design, the implementation of TimeTravel is
> > > >> >> delegated
> > > >> >>>>> to
> > > >> >>>>>>> Catalog. We have added a function called getTable(ObjectPath
> > > >> >>>>> tablePath,
> > > >> >>>>>>> long timestamp) to obtain the corresponding CatalogBaseTable
> > at
> > > a
> > > >> >>>>>> specific
> > > >> >>>>>>> time.  Therefore, I think it will not have any impact on the
> > > >> >>> original
> > > >> >>>>>>> pushdown abilities.
> > > >> >>>>>>>
> > > >> >>>>>>>
> > > >> >>>>>>>>  I see there is a rejected  design for adding
> > > >> >> SupportsTimeTravel,
> > > >> >>>>> but
> > > >> >>>>>> I
> > > >> >>>>>>> didn't see the alternative in  the FLIP doc
> > > >> >>>>>>>
> > > >> >>>>>>> Sorry, the document description is not very clear.
> Regarding
> > > >> >>> whether
> > > >> >>>>> to
> > > >> >>>>>>> support SupportTimeTravel, I have discussed it with yuxia.
> > Since
> > > >> >> we
> > > >> >>>>> have
> > > >> >>>>>>> already passed the corresponding time in
> getTable(ObjectPath,
> > > long
> > > >> >>>>>>> timestamp) of Catalog, SupportTimeTravel may not be
> necessary.
> > > >> >>>>>>>
> > > >> >>>>>>> In getTable(ObjectPath object, long timestamp), we can
> obtain
> > > the
> > > >> >>>>> schema
> > > >> >>>>>> of
> > > >> >>>>>>> the corresponding time point and put the SNAPSHOT that needs
> > to
> > > be
> > > >> >>>>>> consumed
> > > >> >>>>>>> into options.
> > > >> >>>>>>>
> > > >> >>>>>>>
> > > >> >>>>>>> @Shammon
> > > >> >>>>>>>> Could we support this in Flink too?
> > > >> >>>>>>>
> > > >> >>>>>>> I personally think it's possible, but limited by Calcite's
> > > syntax
> > > >> >>>>>>> restrictions. I believe we should first support this syntax
> in
> > > >> >>>>> Calcite.
> > > >> >>>>>>> Currently, I think it may not be easy  to support this
> syntax
> > in
> > > >> >>>>> Flink's
> > > >> >>>>>>> parser. @Benchao, what do you think? Can Calcite support
> this
> > > >> >> syntax
> > > >> >>>>>>> ` VERSION AS OF`  ?
> > > >> >>>>>>>
> > > >> >>>>>>>
> > > >> >>>>>>> Best,
> > > >> >>>>>>> Feng.
> > > >> >>>>>>>
> > > >> >>>>>>>
> > > >> >>>>>>> On Fri, May 26, 2023 at 2:55 PM Shammon FY <
> zjur...@gmail.com
> > >
> > > >> >>> wrote:
> > > >> >>>>>>>
> > > >> >>>>>>>> Thanks Feng, the feature of time travel sounds great!
> > > >> >>>>>>>>
> > > >> >>>>>>>> In addition to SYSTEM_TIME, lake houses such as paimon and
> > > >> >> iceberg
> > > >> >>>>>>> support
> > > >> >>>>>>>> snapshot or version. For example, users can query snapshot
> 1
> > > for
> > > >> >>>>> paimon
> > > >> >>>>>>> by
> > > >> >>>>>>>> the following statement
> > > >> >>>>>>>> SELECT * FROM t VERSION AS OF 1
> > > >> >>>>>>>>
> > > >> >>>>>>>> Could we support this in Flink too?
> > > >> >>>>>>>>
> > > >> >>>>>>>> Best,
> > > >> >>>>>>>> Shammon FY
> > > >> >>>>>>>>
> > > >> >>>>>>>> On Fri, May 26, 2023 at 1:20 PM Benchao Li <
> > > >> >> libenc...@apache.org>
> > > >> >>>>>> wrote:
> > > >> >>>>>>>>
> > > >> >>>>>>>>> Regarding the implementation, did you consider the
> pushdown
> > > >> >>>>> abilities
> > > >> >>>>>>>>> compatible, e.g., projection pushdown, filter pushdown,
> > > >> >>> partition
> > > >> >>>>>>>> pushdown.
> > > >> >>>>>>>>> Since `Snapshot` is not handled much in existing rules, I
> > > >> >> have a
> > > >> >>>>>>> concern
> > > >> >>>>>>>>> about this. Of course, it depends on your implementation
> > > >> >> detail,
> > > >> >>>>> what
> > > >> >>>>>>> is
> > > >> >>>>>>>>> important is that we'd better add some cross tests for
> > these.
> > > >> >>>>>>>>>
> > > >> >>>>>>>>> Regarding the interface exposed to Connector, I see there
> > is a
> > > >> >>>>>> rejected
> > > >> >>>>>>>>> design for adding SupportsTimeTravel, but I didn't see the
> > > >> >>>>>> alternative
> > > >> >>>>>>> in
> > > >> >>>>>>>>> the FLIP doc. IMO, this is an important thing we need to
> > > >> >> clarify
> > > >> >>>>>>> because
> > > >> >>>>>>>> we
> > > >> >>>>>>>>> need to know whether the Connector supports this, and what
> > > >> >>>>>>>> column/metadata
> > > >> >>>>>>>>> corresponds to 'system_time'.
> > > >> >>>>>>>>>
> > > >> >>>>>>>>> Feng Jin <jinfeng1...@gmail.com> 于2023年5月25日周四 22:50写道:
> > > >> >>>>>>>>>
> > > >> >>>>>>>>>> Thanks for your reply
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>> @Timo @BenChao @yuxia
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>> Sorry for the mistake,  Currently , calcite only supports
> > > >> >>> `FOR
> > > >> >>>>>>>>> SYSTEM_TIME
> > > >> >>>>>>>>>> AS OF `  syntax.  We can only support `FOR SYSTEM_TIME AS
> > > >> >> OF`
> > > >> >>> .
> > > >> >>>>>> I've
> > > >> >>>>>>>>>> updated the syntax part of the FLIP.
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>> @Timo
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>>> We will convert it to TIMESTAMP_LTZ?
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>> Yes, I think we need to convert TIMESTAMP to
> TIMESTAMP_LTZ
> > > >> >> and
> > > >> >>>>> then
> > > >> >>>>>>>>> convert
> > > >> >>>>>>>>>> it into a long value.
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>>> How do we want to query the most recent version of a
> table
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>> I think we can use `AS OF CURRENT_TIMESTAMP` ,But it does
> > > >> >>> cause
> > > >> >>>>>>>>>> inconsistency with the real-time concept.
> > > >> >>>>>>>>>> However, from my personal understanding, the scope of
> `AS
> > > >> >> OF
> > > >> >>>>>>>>>> CURRENT_TIMESTAMP` is the table itself, not the table
> > > >> >> record.
> > > >> >>>>> So,
> > > >> >>>>>> I
> > > >> >>>>>>>>> think
> > > >> >>>>>>>>>> using CURRENT_TIMESTAMP should also be reasonable?.
> > > >> >>>>>>>>>> Additionally, if no version is specified, the latest
> > version
> > > >> >>>>> should
> > > >> >>>>>>> be
> > > >> >>>>>>>>> used
> > > >> >>>>>>>>>> by default.
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>> Best,
> > > >> >>>>>>>>>> Feng
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>> On Thu, May 25, 2023 at 7:47 PM yuxia <
> > > >> >>>>> luoyu...@alumni.sjtu.edu.cn
> > > >> >>>>>>>
> > > >> >>>>>>>>> wrote:
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>>> Thanks Feng for bringing this up. It'll be great to
> > > >> >>> introduce
> > > >> >>>>>> time
> > > >> >>>>>>>>> travel
> > > >> >>>>>>>>>>> to Flink to have a better integration with external data
> > > >> >>>>> soruces.
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>> I also share same concern about the syntax.
> > > >> >>>>>>>>>>> I see in the part of `Whether to support other syntax
> > > >> >>>>>>>> implementations`
> > > >> >>>>>>>>> in
> > > >> >>>>>>>>>>> this FLIP, seems the syntax in Calcite should be `FOR
> > > >> >>>>> SYSTEM_TIME
> > > >> >>>>>>> AS
> > > >> >>>>>>>>> OF`,
> > > >> >>>>>>>>>>> right?
> > > >> >>>>>>>>>>> But the the syntax part in this FLIP, it seems to be `AS
> > > >> >> OF
> > > >> >>>>>>>> TIMESTAMP`
> > > >> >>>>>>>>>>> instead of  `FOR SYSTEM_TIME AS OF`. Is it just a
> mistake
> > > >> >> or
> > > >> >>>>> by
> > > >> >>>>>>>> design?
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>> Best regards,
> > > >> >>>>>>>>>>> Yuxia
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>> ----- 原始邮件 -----
> > > >> >>>>>>>>>>> 发件人: "Benchao Li" <libenc...@apache.org>
> > > >> >>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org>
> > > >> >>>>>>>>>>> 发送时间: 星期四, 2023年 5 月 25日 下午 7:27:17
> > > >> >>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch
> > > >> >>> Mode
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>> Thanks Feng, it's exciting to have this ability.
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>> Regarding the syntax section, are you proposing `AS OF`
> > > >> >>>>> instead
> > > >> >>>>>> of
> > > >> >>>>>>>> `FOR
> > > >> >>>>>>>>>>> SYSTEM AS OF` to do this? I know `FOR SYSTEM AS OF` is
> in
> > > >> >>> the
> > > >> >>>>> SQL
> > > >> >>>>>>>>>> standard
> > > >> >>>>>>>>>>> and has been supported in some database vendors such as
> > > >> >> SQL
> > > >> >>>>>> Server.
> > > >> >>>>>>>>> About
> > > >> >>>>>>>>>>> `AS OF`, is it in the standard or any database vendor
> > > >> >>> supports
> > > >> >>>>>>> this,
> > > >> >>>>>>>> if
> > > >> >>>>>>>>>>> yes, I think it's worth to add this support to Calcite,
> > > >> >> and
> > > >> >>> I
> > > >> >>>>>> would
> > > >> >>>>>>>>> give
> > > >> >>>>>>>>>> a
> > > >> >>>>>>>>>>> hand in Calcite side. Otherwise, I think we'd better to
> > > >> >> use
> > > >> >>>>> `FOR
> > > >> >>>>>>>> SYSTEM
> > > >> >>>>>>>>>> AS
> > > >> >>>>>>>>>>> OF`.
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>> Timo Walther <twal...@apache.org> 于2023年5月25日周四
> 19:02写道:
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>>> Also: How do we want to query the most recent version
> > > >> >> of a
> > > >> >>>>>> table?
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>> `AS OF CURRENT_TIMESTAMP` would be ideal, but according
> > > >> >> to
> > > >> >>>>> the
> > > >> >>>>>>> docs
> > > >> >>>>>>>>>> both
> > > >> >>>>>>>>>>>> the type is TIMESTAMP_LTZ and what is even more
> > > >> >> concerning
> > > >> >>>>> is
> > > >> >>>>>> the
> > > >> >>>>>>>> it
> > > >> >>>>>>>>>>>> actually is evalated row-based:
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> Returns the current SQL timestamp in the local time
> > > >> >>> zone,
> > > >> >>>>>> the
> > > >> >>>>>>>>> return
> > > >> >>>>>>>>>>>> type is TIMESTAMP_LTZ(3). It is evaluated for each
> > > >> >> record
> > > >> >>> in
> > > >> >>>>>>>>> streaming
> > > >> >>>>>>>>>>>> mode. But in batch mode, it is evaluated once as the
> > > >> >> query
> > > >> >>>>>> starts
> > > >> >>>>>>>> and
> > > >> >>>>>>>>>>>> uses the same result for every row.
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>> This could make it difficult to explain in a join
> > > >> >> scenario
> > > >> >>>>> of
> > > >> >>>>>>>>> multiple
> > > >> >>>>>>>>>>>> snapshotted tables.
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>> Regards,
> > > >> >>>>>>>>>>>> Timo
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>> On 25.05.23 12:29, Timo Walther wrote:
> > > >> >>>>>>>>>>>>> Hi Feng,
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> thanks for proposing this FLIP. It makes a lot of
> > > >> >> sense
> > > >> >>> to
> > > >> >>>>>>>> finally
> > > >> >>>>>>>>>>>>> support querying tables at a specific point in time or
> > > >> >>>>>>> hopefully
> > > >> >>>>>>>>> also
> > > >> >>>>>>>>>>>>> ranges soon. Following time-versioned tables.
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> Here is some feedback from my side:
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> 1. Syntax
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> Can you elaborate a bit on the Calcite restrictions?
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> Does Calcite currently support `AS OF` syntax for this
> > > >> >>> but
> > > >> >>>>>> not
> > > >> >>>>>>>> `FOR
> > > >> >>>>>>>>>>>>> SYSTEM_TIME AS OF`?
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> It would be great to support `AS OF` also for
> > > >> >>>>> time-versioned
> > > >> >>>>>>>> joins
> > > >> >>>>>>>>>> and
> > > >> >>>>>>>>>>>>> have a unified and short syntax.
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> Once a fix is merged in Calcite for this, we can make
> > > >> >>> this
> > > >> >>>>>>>>> available
> > > >> >>>>>>>>>> in
> > > >> >>>>>>>>>>>>> Flink earlier by copying the corresponding classes
> > > >> >> until
> > > >> >>>>> the
> > > >> >>>>>>> next
> > > >> >>>>>>>>>>>>> Calcite upgrade is performed.
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> 2. Semantics
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> How do we interpret the timestamp? In Flink we have 2
> > > >> >>>>>> timestamp
> > > >> >>>>>>>>> types
> > > >> >>>>>>>>>>>>> (TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF
> > > >> >>>>>> TIMESTAMP
> > > >> >>>>>>>>>>>>> '2023-04-27 00:00:00', in which timezone will the
> > > >> >>>>> timestamp
> > > >> >>>>>> be?
> > > >> >>>>>>>> We
> > > >> >>>>>>>>>> will
> > > >> >>>>>>>>>>>>> convert it to TIMESTAMP_LTZ?
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> We definely need to clarify this because the past has
> > > >> >>>>> shown
> > > >> >>>>>>> that
> > > >> >>>>>>>>>>>>> daylight saving times make our lives hard.
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> Thanks,
> > > >> >>>>>>>>>>>>> Timo
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>> On 25.05.23 10:57, Feng Jin wrote:
> > > >> >>>>>>>>>>>>>> Hi, everyone.
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>> I’d like to start a discussion about FLIP-308:
> > > >> >> Support
> > > >> >>>>> Time
> > > >> >>>>>>>> Travel
> > > >> >>>>>>>>>> In
> > > >> >>>>>>>>>>>>>> Batch
> > > >> >>>>>>>>>>>>>> Mode [1]
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>> Time travel is a SQL syntax used to query historical
> > > >> >>>>>> versions
> > > >> >>>>>>> of
> > > >> >>>>>>>>>> data.
> > > >> >>>>>>>>>>>> It
> > > >> >>>>>>>>>>>>>> allows users to specify a point in time and retrieve
> > > >> >>> the
> > > >> >>>>>> data
> > > >> >>>>>>>> and
> > > >> >>>>>>>>>>>>>> schema of
> > > >> >>>>>>>>>>>>>> a table as it appeared at that time. With time
> > > >> >> travel,
> > > >> >>>>> users
> > > >> >>>>>>> can
> > > >> >>>>>>>>>>> easily
> > > >> >>>>>>>>>>>>>> analyze and compare historical versions of data.
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>> With the widespread use of data lake systems such as
> > > >> >>>>> Paimon,
> > > >> >>>>>>>>>> Iceberg,
> > > >> >>>>>>>>>>>> and
> > > >> >>>>>>>>>>>>>> Hudi, time travel can provide more convenience for
> > > >> >>> users'
> > > >> >>>>>> data
> > > >> >>>>>>>>>>> analysis.
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>> Looking forward to your opinions, any suggestions are
> > > >> >>>>>>> welcomed.
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>> 1.
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>
> > > >> >>>>>>>>
> > > >> >>>>>>>
> > > >> >>>>>>
> > > >> >>>>>
> > > >> >>>
> > > >> >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>> Best.
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>> Feng
> > > >> >>>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>>
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>>
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>> --
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>> Best,
> > > >> >>>>>>>>>>> Benchao Li
> > > >> >>>>>>>>>>>
> > > >> >>>>>>>>>>
> > > >> >>>>>>>>>
> > > >> >>>>>>>>>
> > > >> >>>>>>>>> --
> > > >> >>>>>>>>>
> > > >> >>>>>>>>> Best,
> > > >> >>>>>>>>> Benchao Li
> > > >> >>>>>>>>>
> > > >> >>>>>>>>
> > > >> >>>>>>>
> > > >> >>>>>>
> > > >> >>>>>>
> > > >> >>>>>> --
> > > >> >>>>>>
> > > >> >>>>>> Best,
> > > >> >>>>>> Benchao Li
> > > >> >>>>>>
> > > >> >>>>>
> > > >> >>>>
> > > >> >>>
> > > >> >>
> > > >>
> > > >>
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li

Reply via email to