How about we simply go for your first approach by having [query-start, row, auto] as configuration parameters where [auto] is the default?

This sounds like a good consensus where everyone is happy, no?

This also allows user to restore the old per-row behavior for all functions that we had before Flink 1.13.

Regards,
Timo


On 26.02.21 11:10, Leonard Xu wrote:
Thanks Joe for the great investigation.


        • Generally urging for semantics (batch > time of first query issued, 
streaming > row level).
I discussed the thing now with Timo & Stephan:
        • It seems to go towards a config parameter, either [query-start, row]  
or [query-start, row, auto] and what is the default?
        • The main question seems to be: are we pushing the default towards 
streaming. (probably related the insert into behaviour in the sql client).


It looks like opinions in this thread and user inputs agreed that: batch should 
use time of first query, streaming should use row level.
Based on these, we should keep row level for streaming and query start for 
batch just like the config parameter value [auto].

Currently Flink keeps row level for time function in both batch and streaming 
job, thus we only need to update the behavior in batch.

I tend to not expose an obscure configuration to users especially it is 
semantics-related.

1.We can make [auto] as a default agreement,for current Flink streaming 
users,they feel nothing has changed,for current Flink batch users,they feel 
Flink batch is corrected to other good batch engines as well as SQL standard. 
We can also provide a function CURRENT_ROW_TIMESTAMP[1] for Flink batch users 
who want row level time function.

2. CURRENT_ROW_TIMESTAMP can also be used in Flink streaming, it has clear 
semantics, we can encourage users to use it.

In this way, We don’t have to introduce an obscure configuration prematurely 
while making all users happy

How do you think?

Best,
Leonard
[1] 
https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-current-row-timestamp.html



Hope this helps,

Thanks,
Joe

On 19.02.2021, at 10:25, Leonard Xu <xbjt...@gmail.com> wrote:

Hi, Joe

Thanks for volunteering to investigate the user data on this topic. Do you
have any progress here?

Thanks,
Leonard

On Thu, Feb 4, 2021 at 3:08 PM Johannes Moser <j...@data-artisans.com> wrote:

Hello,

I will work with some users to get data on that.

Thanks, Joe

On 03.02.2021, at 14:58, Stephan Ewen <se...@apache.org> wrote:

Hi all!

A quick thought on this thread: We see a typical stalemate here, as in so
many discussions recently.
One developer prefers it this way, another one another way. Both have
pro/con arguments, it takes a lot of time from everyone, still there is
little progress in the discussion.

Ultimately, this can only be decided by talking to the users. And it
would also be the best way to ensure that what we build is the intuitive
and expected way for users.
The less the users are into the deep aspects of Flink SQL, the better
they
can mirror what a common user would expect (a power user will anyways
figure it out).
Let's find a person to drive that, spell it out in the FLIP as "semantics
TBD", and focus on the implementation of the parts that are agreed upon.

For interviewing the users, here are some ideas for questions to look at:
- How do they view the trade-off between stable semantics vs.
out-of-the-box magic (faster getting started).
- How comfortable are they realizing the different meaning of "now()" in
a streaming versus batch context.
- What would be their expectation when moving a query with the time
functions ("now()") from an unbounded stream (Kafka source without end
offset) to a bounded stream (Kafka source with end offsets), which may
switch execution to batch.

Best,
Stephan


On Tue, Feb 2, 2021 at 3:19 PM Jark Wu <imj...@gmail.com> wrote:

Hi Fabian,

I think we have an agreement that the functions should be evaluated at
query start in batch mode.
Because all the other batch systems and traditional databases are this
behavior, which is standard SQL compliant.

*1. The different point of view is what's the behavior in streaming
mode? *

 From my point of view, I don't see any potential meaning to evaluate at
query-start for a 365-day long running streaming job.
And from my observation, CURRENT_TIMESTAMP is heavily used by Flink
streaming users and they expect the current behaviors.
The SQL standard only provides a guideline for traditional batch
systems,
however Flink is a leading streaming processing system
which is out of the scope of SQL standard, and Flink should define the
streaming standard. I think a standard should follow users' intuition.
Therefore, I think we don't need to be standard SQL compliant at this
point
because users don't expect it.
Changing the behavior of the functions to evaluate at query start for
streaming mode will hurt most of Flink SQL users and we have nothing to
gain,
we should avoid this.

*2. Does it break the unified streaming-batch semantics? *

I don't think so. First of all, what's the unified streaming-batch
semantic?
I think it means the* eventual result* instead of the *behavior*.
It's hard to say we have provided unified behavior for streaming and
batch
jobs,
because for example unbounded aggregate behaves very differently.
In batch mode, it only evaluates once for the bounded data and emits the
aggregate result once.
But in streaming mode, it evaluates for each row and emits the updated
result.
What we have always emphasized "unified streaming-batch semantics" is
[1]

a query produces exactly the same result regardless whether its input
is
static batch data or streaming data.

 From my understanding, the "semantic" means the "eventual result".
And time functions are non-deterministic, so it's reasonable to get
different results for batch and streaming mode.
Therefore, I think it doesn't break the unified streaming-batch
semantics
to evaluate per-record for streaming and
query-start for batch, as the semantic doesn't means behavior semantic.

Best,
Jark

[1]: https://flink.apache.org/news/2017/04/04/dynamic-tables.html

On Tue, 2 Feb 2021 at 18:34, Fabian Hueske <fhue...@gmail.com> wrote:

Hi everyone,

Sorry for joining this discussion late.
Let me give some thought to two of the arguments raised in this thread.

Time functions are inherently non-determintistic:
--
This is of course true, but IMO it doesn't mean that the semantics of
time
functions do not matter.
It makes a difference whether a function is evaluated once and it's
result
is reused or whether it is invoked for every record.
Would you use the same logic to justify different behavior of RAND() in
batch and streaming queries?

Provide the semantics that most users expect:
--
I don't think it is clear what most users expect, esp. if we also
include
future users (which we certainly want to gain) into this assessment.
Our current users got used to the semantics that we introduced. So I
wouldn't be surprised if they would say stick with the current
semantics.
However, we are also claiming standard SQL compliance and stress the
goal
of batch-stream unification.
So I would assume that new SQL users expect standard compliant behavior
for
batch and streaming queries.


IMO, we should try hard to stick to our goals of 1) unified
batch-streaming
semantics and 2) SQL standard compliance.
For me this means that the semantics of the functions should be
adjusted
to
be evaluated at query start by default for batch and streaming queries.
Obviously this would affect *many* current users of streaming SQL.
For those we should provide two solutions:

1) Add alternative methods that provide the current behavior of the
time
functions.
I like Timo's proposal to add a prefix like SYS_ (or PROC_) but don't
care
too much about the names.
The important point is that users need alternative functions to provide
the
desired semantics.

2) Add a configuration option to reestablish the current behavior of
the
time functions.
IMO, the configuration option should not be considered as a permanent
option but rather as a migration path towards the "right" (standard
compliant) behavior.

Best, Fabian

Am Di., 2. Feb. 2021 um 09:51 Uhr schrieb Kurt Young <ykt...@gmail.com
:

BTW I also don't like to introduce an option for this case at the
first step.

If we can find a default behavior which can make 90% users happy, we
should
do it. If the remaining
10% percent users start to complain about the fixed behavior (it's
also
possible that they don't complain ever),
we could offer an option to make them happy. If it turns out that we
had
wrong estimation about the user's
expectation, we should change the default behavior.

Best,
Kurt


On Tue, Feb 2, 2021 at 4:46 PM Kurt Young <ykt...@gmail.com> wrote:

Hi Timo,

I don't think batch-stream unification can deal with all the cases,
especially if
the query involves some non deterministic functions.

No matter we choose any options, these queries will have
different results.
For example, if we run the same query in batch mode multiple times,
it's
also
highly possible that we get different results. Does that mean all the
database
vendors can't deliver batch-batch unification? I don't think so.

What's really important here is the user's intuition. What do users
expect
if
they don't read any documents about these functions. For batch
users, I
think
it's already clear enough that all other systems and databases will
evaluate
these functions during query start. And for streaming users, I have
already seen
some users are expecting these functions to be calculated per record.

Thus I think we can make the behavior determined together with
execution
mode.
One exception would be PROCTIME(), I think all users would expect
this
function
will be calculated for each record. I think SYS_CURRENT_TIMESTAMP is
similar
to PROCTIME(), so we don't have to introduce it.

Best,
Kurt


On Tue, Feb 2, 2021 at 4:20 PM Timo Walther <twal...@apache.org>
wrote:

Hi everyone,

I'm not sure if we should introduce the `auto` mode. Taking all the
previous discussions around batch-stream unification into account,
batch
mode and streaming mode should only influence the runtime efficiency
and
incremental computation. The final query result should be the same
in
both modes. Also looking into the long-term future, we might drop
the
mode property and either derive the mode or use different modes for
parts of the pipeline.

"I think we may need to think more from the users' perspective."

I agree here and that's why I actually would like to let the user
decide
which semantics are needed. The config option proposal was my least
favored alternative. We should stick to the standard and bahavior of
other systems. For both batch and streaming. And use a simple prefix
to
let users decide whether the semantics are per-record or per-query:

CURRENT_TIMESTAMP       -- semantics as all other vendors


_CURRENT_TIMESTAMP      -- semantics per record

OR

SYS_CURRENT_TIMESTAMP      -- semantics per record


Please check how other vendors are handling this:

SYSDATE          MySql, Oracle
SYSDATETIME      SQL Server


Regards,
Timo


On 02.02.21 07:02, Jingsong Li wrote:
+1 for the default "auto" to the
"table.exec.time-function-evaluation".

 From the definition of these functions, in my opinion:
- Batch is the instant execution of all records, which is the
meaning
of
the word "BATCH", so there is only one time at query-start.
- Stream only executes a single record in a moment, so time is
generated by
each record.

On the other hand, we should be more careful about consistency
with
other
systems.

Best,
Jingsong

On Tue, Feb 2, 2021 at 11:24 AM Jark Wu <imj...@gmail.com> wrote:

Hi Leonard, Timo,

I just did some investigation and found all the other batch
processing
systems
evaluate the time functions at query-start, including
Snowflake,
Hive,
Spark, Trino.
I'm wondering whether the default 'per-record' mode will still be
weird for
batch users.
I know we proposed the option for batch users to change the
behavior.
However if 90% users need to set this config before submitting
batch
jobs,
why not
use this mode for batch by default? For the other 10% special
users,
they
can still
set the config to per-record before submitting batch jobs. I
believe
this
can greatly
improve the usability for batch cases.

Therefore, what do you think about using "auto" as the default
option
value?

It evaluates time functions per-record in streaming mode and
evaluates
at
query start in batch mode.
I think this can make both streaming users and batch users happy.
IIUC, the
reason why we
proposing the default "per-record" mode is for the batch
streaming
consistent.
However, I think time functions are special cases because they
are
naturally non-deterministic.
Even if streaming jobs and batch jobs all use "per-record" mode,
they
still
can't provide consistent
results. Thus, I think we may need to think more from the users'
perspective.

Best,
Jark


On Mon, 1 Feb 2021 at 23:06, Timo Walther <twal...@apache.org>
wrote:

Hi Leonard,

thanks for considering this issue as well. +1 for the proposed
config
option. Let's start a voting thread once the FLIP document has
been
updated if there are no other concerns?

Thanks,
Timo


On 01.02.21 15:07, Leonard Xu wrote:
Hi, all

I’ve discussed with @Timo @Jark about the time function
evaluation
further. We reach a consensus that we’d better address the time
function
evaluation(function value materialization) in this FLIP as well.

We’re fine with introducing an option
table.exec.time-function-evaluation to control the materialize
time
point
of time function value. The time function includes
LOCALTIME
LOCALTIMESTAMP
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
NOW()
The default value of table.exec.time-function-evaluation is
'per-record', which means Flink evaluates the function value per
record,
we
recommend users config this option value for their streaming
pipe
lines.
Another valid option value is ’query-start’, which means Flink
evaluates
the function value at the query start, we recommend users config
this
option value for their batch pipelines.
In the future, more valid evaluation option value like ‘auto'
may
be
supported if there’re new requirements, e.g: support ‘auto’
option
which
evaluates time function value per-record in streaming mode and
evaluates
time function value at query start in batch mode.

Alternative1:
     Introduce function like
CURRENT_TIMESTAMP2/CURRENT_TIMESTAMP_NOW
which evaluates function value at query start. This may confuse
users
a
bit
that we provide two similar functions but with different return
value.


Alternative2:
       Do not introduce any configuration/function, control
the
function evaluation by pipeline execution mode. This may produce
different
result when user use their  streaming pipeline sql to run a
batch
pipeline(e.g backfilling), and user also
can not control these function behavior.


How do you think ?

Thanks,
Leonard


在 2021年2月1日,18:23,Timo Walther <twal...@apache.org> 写道:

Parts of the FLIP can already be implemented without a
completed
voting, e.g. there is no doubt that we should support TIME(9).

However, I don't see a benefit of reworking the time functions
to
rework them again later. If we lock the time on query-start the
implementation of the previsouly mentioned functions will be
completely
different.

Regards,
Timo


On 01.02.21 02:37, Kurt Young wrote:
I also prefer to not expand this FLIP further, but we could
open
a
discussion thread
right after this FLIP being accepted and start coding &
reviewing.
Make
technique
discussion and coding more pipelined will improve efficiency.
Best,
Kurt
On Sat, Jan 30, 2021 at 3:47 PM Leonard Xu <
xbjt...@gmail.com>
wrote:
Hi, Timo

I do think that this topic must be part of the FLIP as
well.
Esp.
if
the
FLIP has the title "time function behavior" and this is
clearly
a
behavioral aspect. We are performing a heavy refactoring of
the
SQL
query
semantics in Flink here which will affect a lot of users. We
cannot
rework
the time functions a third time after this.
I checked a couple of other vendors. It seems that they all
lock
the
timestamp when the query is started. And as you said, in
this
case
both
mature (Oracle) and less mature systems (Hive, MySQL) have
the
same
behavior.

FLIP-162> “These problems come from the fact that lots of
time-related
functions like PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME
and
CURRENT_TIMESTAMP are returning time values based on UTC+0
time
zone."
The motivation of  FLIP-162 is to correct the wrong
time-related
function
value which caused by timezone. And after our discussed
before,
we
found
it's related to the function return type compared to SQL
standard
and
other
vendors and thus we proposed make the function return type
also
consistent.
This is the exact meaning of the FLIP  title and that the
FLIP
plans
to do.

But for the function materialization mechanism, we didn't
consider
yet as
a part of our plan because we need to fix the timezone and
function
type
issues no matter we modify the function materialization
mechanism
in
the
future or not.
So I think it's not belong to this FLIP scope.

It will have been a great work if we can fix current FLIP's
7
proposals
well, we don't want to expand the scope again Eps it's not
part
of
our
plan.

What do you think? @Timo

And what’s others' thoughts?  @Jark @Kurt

Best,
Leonard




Flink should not differ. I fear that we have to adopt this
behavior
as
well to call us standard compliant. Otherwise it will also
not
be
possible
to have Hive compatibility with proper semantics. It could
lead
to
unintended behavior.

I see two options for this topic:

1) Clearly distinguish between query-start and processing
time

MySQL offers NOW() and SYSDATE() to distinguish the two
semantics.
We
could run all the previously discussed functions that have a
meaning
in
other systems in query-start time and use a different name
for
processing
time. `SYS_TIMESTAMP`, `SYS_DATE`, `SYS_TIME`,
`SYS_LOCALTIMESTAMP`,
`SYS_LOCALDATE`, `SYS_LOCALTIME`?

2) Introduce a config option

We are non-compliant by default and allow typical batch
behavior
if
needed via a config option. But batch/stream unification
should
not
mean
that we disable certain unification aspects by default.

What do you think?

Regards,
Timo

On 28.01.21 16:51, Leonard Xu wrote:
Hi, Timo
I'm sorry that I need to open another discussion thread
befoe
voting
but I think we should also discuss this in this FLIP before
it
pops
up at a
later stage.

How do we want our time functions to behave in long
running
queries?
It’s okay to open this thread. Although I don’t want to
consider
the
function value materialization in this FLIP scope,  I could
try
explain
something.
See also:








https://stackoverflow.com/questions/5522656/sql-now-in-long-running-query

I think this was never discussed thoroughly. Actually
CURRENT_TIMESTAMP/NOW/LOCALTIMESTAMP should have slightly
different
semantics than PROCTIME(). What it is our current behavior?
Are
we
materializing those time values during planning?
Currently CURRENT_TIMESTAMP/NOW/LOCALTIMESTAMP  keeps same
behavior
in
both Batch and Stream world,  the function value is
materialized
for
per
record not the query start(plan phase).
For  PROCTIME(), it also keeps same behavior  in both
Batch
and
Stream
world, in fact we just supported PROCTIME() in Batch last
week[1].
In one word, we keep same semantics/behavior for Batch and
Stream.
Esp. long running batch queries might suffer from
inconsistencies
here. When a timestamp is produced by one operator using
CURRENT_TIMESTAMP
and a different one might filter relating to
CURRENT_TIMESTAMP.
It’s a good question, and I've found some users have asked
simillar
questions in user/user-zh mail-list,  given a fact that many
Batch
systems
like Hive/Presto using the value of query start, but it’s
not
suitable for
Stream engine, for example user will use CURRENT_TIMESTAMP
to
define
event
time.
As a unified Batch/Stream SQL engine, keep same
semantics/behavior
is
important, and I agree the Batch user case should also be
considered.
But I think this should be discussed in another topic like
'the
unification of Batch/Stream' which is beyond the scope of
this
FLIP.
This FLIP aims to correct the wrong return type/return
value
of
current
time functions.
Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-17868 <
https://issues.apache.org/jira/browse/FLINK-17868> <
https://issues.apache.org/jira/browse/FLINK-17868 <
https://issues.apache.org/jira/browse/FLINK-17868>>
Regards,
Timo


On 28.01.21 13:46, Leonard Xu wrote:
Hi, Jark
I have a minor suggestion:
I think we will still suggest users use TIMESTAMP even
if
we
have
TIMESTAMP_NTZ. Then it seems
introducing TIMESTAMP_NTZ doesn't help much for users,
but
introduces more learning costs.
I think your suggestion makes sense, we should suggest
users
use
TIMESTAMP for TIMESTAMP WITHOUT TIME ZONE as we did now,
updated
as
following:
   original type name :
                      shortcut type name :
TIMESTAMP / TIMESTAMP WITHOUT TIME ZONE         <=>
TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE
<=>
TIMESTAMP_LTZ
TIMESTAMP WITH TIME ZONE
<=>
TIMESTAMP_TZ     (supports them in the future)
Best,
Leonard





On Thu, 28 Jan 2021 at 18:52, Leonard Xu <
xbjt...@gmail.com
<mailto:
xbjt...@gmail.com> <mailto:xbjt...@gmail.com <mailto:
xbjt...@gmail.com>>>
wrote:

Thanks all for sharing your opinions.

Looks like  we’ve reached a consensus about the topic.

@Timo:
1) Are we on the same page that LOCALTIMESTAMP
returns
TIMESTAMP
and not
TIMESTAMP_LTZ? Maybe we should quickly list also
LOCALTIME/LOCALDATE and
LOCALTIMESTAMP for completeness.
Yes, LOCALTIMESTAMP returns TIMESTAMP, LOCALTIME
returns
TIME,
the
behavior of them is clear so I just listed them in the
excel[1]
of
this
FLIP references.

2) Shall we add aliases for the timestamp types as
part
of
this
FLIP? I
see Snowflake supports TIMESTAMP_LTZ , TIMESTAMP_NTZ ,
TIMESTAMP_TZ
[1]. I
think the discussion was quite cumbersome with the
full
string
of
`TIMESTAMP WITH LOCAL TIME ZONE`. With this FLIP we
are
making
this
type
even more prominent. And important concepts should
have
a
short
name
because they are used frequently. According to the
FLIP,
we
are
introducing
the abbriviation already in function names like
`TO_TIMESTAMP_LTZ`.
`TIMESTAMP_LTZ` could be treated similar to `STRING`
for
`VARCHAR(MAX_INT)`, the serializable string
representation
would
not change.

@Timo @Jark
Nice idea, I also suffered from the long name during
the
discussions, the
abbreviation will not only help us, but also makes it
more
convenient for
users. I list the abbreviation name mapping to
support:
TIMESTAMP WITHOUT TIME ZONE         <=> TIMESTAMP_NTZ
(which
synonyms
TIMESTAMP)
TIMESTAMP WITH LOCAL TIME ZONE    <=> TIMESTAMP_LTZ
TIMESTAMP WITH TIME ZONE                 <=>
TIMESTAMP_TZ
(supports
them in the future)
3) I'm fine with supporting all conversion classes
like
java.time.LocalDateTime, java.sql.Timestamp that
TimestampType
supported
for LocalZonedTimestampType. But we agree that Instant
stays
the
default
conversion class right? The default extraction defined
in
[2]
will
not
change, correct?
Yes, Instant stays the default conversion class. The
default

4) I would remove the comment "Flink supports
TIME-related
types
with
precision well", because unfortunately this is still
not
correct.
We still
have issues with TIME(9), it would be great if someone
can
finally
fix that
though. Maybe the implementation of this FLIP would
be a
good
time
to fix
this issue.
You’re right, TIME(9) is not supported yet, I'll take
account
of
TIME(9)
to the scope of this FLIP.


I’ve updated this FLIP[2] according your suggestions
@Jark
@Timo
I’ll start the vote soon if there’re no objections.

Best,
Leonard

[1]








https://docs.google.com/spreadsheets/d/1T178krh9xG-WbVpN7mRVJ8bzFnaSJx3l-eg1EWZe_X4/edit?usp=sharing
<








https://docs.google.com/spreadsheets/d/1T178krh9xG-WbVpN7mRVJ8bzFnaSJx3l-eg1EWZe_X4/edit?usp=sharing
<







https://docs.google.com/spreadsheets/d/1T178krh9xG-WbVpN7mRVJ8bzFnaSJx3l-eg1EWZe_X4/edit?usp=sharing


[2]








https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior
<







https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior

<








https://cwiki.apache.org/confluence/display/FLINK/FLIP-162:+Consistent+Flink+SQL+time+function+behavior
<







https://cwiki.apache.org/confluence/display/FLINK/FLIP-162:+Consistent+Flink+SQL+time+function+behavior




On 28.01.21 03:18, Jark Wu wrote:
Thanks Leonard for the further investigation.
I think we all agree we should correct the return
value
of
CURRENT_TIMESTAMP.
Regarding the return type of CURRENT_TIMESTAMP, I
also
agree
TIMESTAMP_LTZ
would be more worldwide useful. This may need more
effort,
but if
this
is
the right direction, we should do it.
Regarding the CURRENT_TIME, if CURRENT_TIMESTAMP
returns
TIMESTAMP_LTZ, then I think CURRENT_TIME shouldn't
return
TIME_TZ.
Otherwise, CURRENT_TIME will be quite special and
strange.
Thus I think it has to return TIME type. Given that
we
already
have
CURRENT_DATE which returns
DATE WITHOUT TIME ZONE, I think it's fine to return
TIME
WITHOUT
TIME
ZONE
for CURRENT_TIME.
In a word, the updated FLIP looks good to me. I
especially
like
the
proposed new function TO_TIMESTAMP_LTZ(numeric,
[,scale]).
This will be very convenient to define rowtime on a
long
value
which is
a
very common case and has been complained a lot in
mailing
list.
Best,
Jark
On Mon, 25 Jan 2021 at 21:12, Kurt Young <
ykt...@gmail.com>
wrote:
Thanks Leonard for the detailed response and also
the
bad
case
about
option
1, these all
make sense to me.

Also nice catch about conversion support of
LocalZonedTimestampType, I
think it actually
makes sense to support java.sql.Timestamp as well
as
java.time.LocalDateTime. It also has
a slight benefit that we might have a chance to run
the
udf
which took
them
as input parameter
after we change the return type.

Regarding to the return type of CURRENT_TIME, I
also
think
timezone
information is not useful.
To not expand this FLIP further, I'm lean to keep
it
as
it
is.

Best,
Kurt


On Mon, Jan 25, 2021 at 8:50 PM Leonard Xu <
xbjt...@gmail.com>
wrote:

Hi, All

Thanks for your comments. I think all of the
thread
have
agreed
that:
(1) The return values of
CURRENT_TIME/CURRENT_TIMESTAMP/NOW()/PROCTIME()
are wrong.
(2) The LOCALTIME/LOCALTIMESTAMP and
CURRENT_TIME/CURRENT_TIMESTAMP
should
be different whether from SQL standard’s
perspective
or
mature
systems.
(3) The semantics of three TIMESTAMP types in
Flink
SQL
follows
the
SQL
standard and also keeps the same with other 'good'
vendors.
   TIMESTAMP
=>  A
literal in
‘yyyy-MM-dd HH:mm:ss’ format to describe a time,
does
not
contain
timezone
info, can not represent an absolute time point.
   TIMESTAMP WITH LOCAL ZONE =>  Records the
elapsed
time
from
absolute
time point origin, can represent an absolute time
point,
requires
local
time zone when expressed with ‘yyyy-MM-dd
HH:mm:ss’
format.
   TIMESTAMP WITH TIME ZONE    =>  Consists of
time
zone
info
and a
literal in ‘yyyy-MM-dd HH:mm:ss’ format to
describe
time,
can
represent
an
absolute time point.


Currently we've two ways to correct
CURRENT_TIME/CURRENT_TIMESTAMP/NOW()/PROCTIME().

option (1): As the FLIP proposed, change the
return
value
from
UTC
timezone to local timezone.
       Pros:   (1) The change looks smaller to
users
and
developers
(2)
There're many SQL engines adopted this way
       Cons:  (1) connector devs may confuse the
underlying
value of
TimestampData which needs to change according to
data
type
(2)
I
thought
about this weekend. Unfortunately I found a bad
case:

The proposal is fine if we only use it in FLINK
SQL
world,
but
we
need to
consider the conversion between Table/DataStream,
assume a
record
produced
in UTC+0 timezone with TIMESTAMP '1970-01-01
08:00:44'
and
the
Flink
SQL
processes the data with session time zone 'UTC+8',
if
the
sql
program
need
to convert the Table to DataStream, then we need
to
calculate
the
timestamp
in StreamRecord with session time zone (UTC+8),
then
we
will
get 44 in
DataStream program, but it is wrong because the
expected
value
should
be
(8
* 60 * 60 + 44). The corner case tell us that the
ROWTIME/PROCTIME in
Flink
are based on UTC+0, when correct the PROCTIME()
function,
the
better
way
is
to use TIMESTAMP WITH LOCAL TIME ZONE which keeps
same
long
value with
time
based on UTC+0 and can be expressed with  local
timezone.

option (2) : As we considered in the FLIP as well
as
@Timo
suggested,
change the return type to TIMESTAMP WITH LOCAL
TIME
ZONE,
the
expressed
value depends on the local time zone.
       Pros: (1) Make Flink SQL more close to
SQL
standard  (2)
Can
deal
the conversion between Table/DataStream well
       Cons: (1) We need to discuss the return
value/type
of
CURRENT_TIME
function (2) The change is bigger to users, we
need
to
support
TIMESTAMP
WITH LOCAL TIME ZONE in connectors/formats as well
as
custom
connectors.
                  (3)The TIMESTAMP WITH LOCAL
TIME
ZONE
support
is
weak
in Flink, thus we need some improvement,but the
workload
does
not
matter
as long as we are doing the right thing ^_^

Due to the above bad case for option (1). I think
option 2
should be
adopted,
But we also need to consider some problems:
(1) More conversion classes like LocalDateTime,
sql.Timestamp
should
be
supported for LocalZonedTimestampType to resolve
the
UDF
compatibility
issue
(2) The timezone offset for window size of one day
should
still
be
considered
(3) All connectors/formats should supports
TIMESTAMP
WITH
LOCAL
TIME
ZONE
well and we also should record in document
I’ll update these sections of FLIP-162.



We also need to discuss the CURRENT_TIME
function. I
know
the
standard
way
is using TIME WITH TIME ZONE(there's no TIME WITH
LOCAL
TIME
ZONE),
but
we
don't support this type yet and I don't see strong
motivation to
support
it
so far.
Compared to CURRENT_TIMESTAMP, the CURRENT_TIME
can
not
represent an
absolute time point which should be considered as
a
string
consisting
of
a
time with 'HH:mm:ss' format and time zone info.
We
have
several
options
for this:
(1) We can forbid CURRENT_TIME as @Timo proposed
to
make
all
Flink SQL
functions follow the standard well,  in this way,
we
need
to
offer
some
guidance for user upgrading Flink versions.
(2) We can also support it from a user's
perspective
who
has
used
CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP,
btw,Snowflake
also
returns
TIME type.
(3) Returns TIMESTAMP WITH LOCAL TIME ZONE to make
it
equal
to
CURRENT_TIMESTAMP as Calcite did.

I can image (1) which we don't want to left a bad
smell
in
Flink SQL,
and
I also accept (2) because I think users do not
consider
time
zone
issues
when they use CURRENT_DATE/CURRENT_TIME, and the
timezone
info
in
time is
not very useful.

I don’t have a strong opinion  for them.  What do
others
think?


I hope I've addressed your concerns. @Timo @Kurt

Best,
Leonard



Most of the mature systems have a clear
difference
between
CURRENT_TIMESTAMP and LOCALTIMESTAMP. I wouldn't
take
Spark
or
Hive
as a
good example. Snowflake decided for TIMESTAMP WITH
LOCAL
TIME
ZONE.
As I
mentioned in the last comment, I could also
imagine
this
behavior for
Flink. But in any case, there should be some time
zone
information
considered in order to cast to all other types.

The function CURRENT_DATE/CURRENT_TIME is
supporting
in
SQL
standard, but
LOCALDATE not, I don’t think it’s a good idea
that
dropping
functions which
SQL standard supported and introducing a
replacement
which
SQL
standard not
reminded.

We can still add those functions in the future.
But
since
we
don't
offer
a TIME WITH TIME ZONE, it is better to not support
this
function at
all
for
now. And by the way, this is exactly the behavior
that
also
Microsoft
SQL
Server does: it also just supports
CURRENT_TIMESTAMP
(but
it
returns
TIMESTAMP without a zone which completes the
confusion).

I also agree returning  TIMESTAMP WITH LOCAL
TIME
ZONE
for
PROCTIME
has
more clear semantics, but I realized that user
didn’t
care
the
type
but
more about the expressed value they saw, and
change
the
type from
TIMESTAMP
to TIMESTAMP WITH LOCAL TIME ZONE brings huge
refactor
that
we
need
consider all places where the TIMESTAMP type
used

 From a UDF perspective, I think nothing will
change.
The
new
type
system
and type inference were designed to support all
these
cases.
There is
a
reason why Java has adopted Joda time, because it
is
hard
to
come up
with a
good time library. That's why also we and the
other
Hadoop
ecosystem
folks
have decided for 3 different kinds of
LocalDateTime,
ZonedDateTime,
and
Instance. It makes the library more complex, but
time
is a
complex
topic.

I also doubt that many users work with only one
time
zone.
Take the
US
as an example, a country with 3 different
timezones.
Somebody
working
with
US data cannot properly see the data points with
just
LOCAL
TIME ZONE.
But
on the other hand, a lot of event data is stored
using a
UTC
timestamp.


Before jumping into technique details, let's
take a
step
back to
discuss
user experience.

The first important question is what kind of
date
and
time
will
Flink
display when users call
CURRENT_TIMESTAMP and maybe also PROCTIME
(if
we
think
they
are
similar).

Should it always display the date and time in
UTC
or
in
the
user's
time
zone?

@Kurt: I think we all agree that the current
behavior
with
just
showing
UTC is wrong. Also, we all agree that when calling
CURRENT_TIMESTAMP
or
PROCTIME a user would like to see the time in it's
current
time
zone.

As you said, "my wall clock time".

However, the question is what is the data type of
what
you
"see". If
you
pass this record on to a different system,
operator,
or
different
cluster,
should the "my" get lost or materialized into the
record?

TIMESTAMP -> completely lost and could cause
confusion
in a
different
system

TIMESTAMP WITH LOCAL TIME ZONE -> at least the
UTC
is
correct,
so you
can provide a new local time zone

TIMESTAMP WITH TIME ZONE -> also "your" location
is
persisted

Regards,
Timo




On 22.01.21 09:38, Kurt Young wrote:
Forgot one more thing. Continue with displaying
in
UTC.
As a
user,
if
Flink
want to display the timestamp
in UTC, why don't we offer something like
UTC_TIMESTAMP?
Best,
Kurt
On Fri, Jan 22, 2021 at 4:33 PM Kurt Young <
ykt...@gmail.com>
wrote:
Before jumping into technique details, let's
take a
step
back to
discuss
user experience.

The first important question is what kind of
date
and
time
will
Flink
display when users call
CURRENT_TIMESTAMP and maybe also PROCTIME (if
we
think
they
are
similar).

Should it always display the date and time in
UTC
or
in
the
user's
time
zone? I think this part is the
reason that surprised lots of users. If we
forget
about
the
type
and
internal representation of these
two methods, as a user, my instinct tells me
that
these
two
methods
should
display my wall clock time.

Display time in UTC? I'm not sure, why I should
care
about
UTC
time?
I
want to get my current timestamp.
For those users who have never gone abroad,
they
might
not
even be
able to
realize that this is affected
by the time zone.

Best,
Kurt


On Fri, Jan 22, 2021 at 12:25 PM Leonard Xu <
xbjt...@gmail.com>
wrote:

Thanks @Timo for the detailed reply, let's go
on
this
topic
on
this
discussion,  I've merged all mails to this
discussion.

LOCALDATE / LOCALTIME / LOCALTIMESTAMP

--> uses session time zone, returns
DATE/TIME/TIMESTAMP


CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP

--> uses session time zone, returns
DATE/TIME/TIMESTAMP

I'm very sceptical about this behavior.
Almost
all
mature
systems
(Oracle, Postgres) and new high quality
systems
(Presto,
Snowflake)
use a
data type with some degree of time zone
information
encoded. In a
globalized world with businesses spanning
different
regions, I
think
we
should do this as well. There should be a
difference
between
CURRENT_TIMESTAMP and LOCALTIMESTAMP. And
users
should
be
able to
choose
which behavior they prefer for their pipeline.


I know that the two series should be different
at
first
glance,
but
different SQL engines can have their own
explanations,for
example,
CURRENT_TIMESTAMP and LOCALTIMESTAMP are
synonyms
in
Snowflake[1]
and
has
no difference, and Spark only supports the
later
one
and
doesn’t
support
LOCALTIME/LOCALTIMESTAMP[2].


If we would design this from scatch, I would
suggest
the
following:

- drop CURRENT_DATE / CURRENT_TIME and let
users
pick
LOCALDATE /
LOCALTIME for materialized timestamp parts

The function CURRENT_DATE/CURRENT_TIME is
supporting
in
SQL
standard,
but
LOCALDATE not, I don’t think it’s a good idea
that
dropping
functions
which
SQL standard supported and introducing a
replacement
which
SQL
standard not
reminded.


- CURRENT_TIMESTAMP should return a TIMESTAMP
WITH
TIME
ZONE to
materialize all session time information into
every
record.
It it
the
most
generic data type and allows to cast to all
other
timestamp
data
types.
This generic ability can be used for filter
predicates
as
well
either
through implicit or explicit casting.

TIMESTAMP WITH TIME ZONE indeed contains more
information to
describe
a
time point, but the type TIMESTAMP  can cast
to
all
other
timestamp
data
types combining with session time zone as
well,
and
it
also
can be
used for
filter predicates. For type casting between
BIGINT
and
TIMESTAMP,
I
think
the function way using
TO_TIMEMTAMP()/FROM_UNIXTIMESTAMP()
is more
clear.

PROCTIME/ROWTIME should be time functions
based
on
a
long
value.
Both
System.currentMillis() and our watermark
system
work
on
long
values.
Those
should return TIMESTAMP WITH LOCAL TIME ZONE
because
the
main
calculation
should always happen based on UTC.
We discussed it in a different thread, but we
should
allow
PROCTIME
globally. People need a way to create
instances
of
TIMESTAMP WITH
LOCAL
TIME ZONE. This is not considered in the
current
design
doc.
Many pipelines contain UTC timestamps and
thus
it
should
be easy
to
create one.
Also, both CURRENT_TIMESTAMP and
LOCALTIMESTAMP
can
work
with
this
type
because we should remember that TIMESTAMP WITH
LOCAL
TIME
ZONE
accepts all
timestamp data types as casting target [1]. We
could
allow
TIMESTAMP
WITH
TIME ZONE in the future for ROWTIME.
In any case, windows should simply adapt
their
behavior to
the
passed
timestamp type. And with TIMESTAMP WITH LOCAL
TIME
ZONE
a
day is
defined by
considering the current session time zone.

I also agree returning  TIMESTAMP WITH LOCAL
TIME
ZONE
for
PROCTIME
has
more clear semantics, but I realized that user
didn’t
care
the
type
but
more about the expressed value they saw, and
change
the
type from
TIMESTAMP
to TIMESTAMP WITH LOCAL TIME ZONE brings huge
refactor
that
we
need
consider all places where the TIMESTAMP type
used,
and
many
builtin
functions and UDFs doest not support
TIMESTAMP
WITH
LOCAL
TIME
ZONE
type.
That means both user and Flink devs need to
refactor
the
code(UDF,
builtin
functions, sql pipeline), to be honest, I
didn’t
see
strong
motivation that
we have to do the pretty big refactor from
user’s
perspective and
developer’s perspective.

In one word, both your suggestion and my
proposal
can
resolve
almost
all
user problems,the divergence is whether we
need
to
spend
pretty
energy just
to get a bit more accurate semantics?   I
think
we
need
a
tradeoff.


Best,
Leonard
[1]







https://trino.io/docs/current/functions/datetime.html#current_timestamp
<







https://trino.io/docs/current/functions/datetime.html#current_timestamp

[2]
https://issues.apache.org/jira/browse/SPARK-30374
<

https://issues.apache.org/jira/browse/SPARK-30374







2021-01-22,00:53,Timo Walther <
twal...@apache.org>

Hi Leonard,

thanks for working on this topic. I agree
that
time
handling is
not
easy in Flink at the moment. We added new time
data
types
(and
some
are
still not supported which even further
complicates
things
like
TIME(9)). We
should definitely improve this situation for
users.

This is a pretty opinionated topic and it
seems
that
the
SQL
standard
is not really deciding this but is at least
supporting.
So
let me
express
my opinion for the most important functions:

LOCALDATE / LOCALTIME / LOCALTIMESTAMP

--> uses session time zone, returns
DATE/TIME/TIMESTAMP

I think those are the most obvious ones
because
the
LOCAL
indicates
that the locality should be materialized into
the
result
and any
time
zone
information (coming from session config or
data)
is
not
important
afterwards.

CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP

--> uses session time zone, returns
DATE/TIME/TIMESTAMP

I'm very sceptical about this behavior.
Almost
all
mature
systems
(Oracle, Postgres) and new high quality
systems
(Presto,
Snowflake)
use a
data type with some degree of time zone
information
encoded. In a
globalized world with businesses spanning
different
regions, I
think
we
should do this as well. There should be a
difference
between
CURRENT_TIMESTAMP and LOCALTIMESTAMP. And
users
should
be
able to
choose
which behavior they prefer for their pipeline.

If we would design this from scatch, I would
suggest
the
following:

- drop CURRENT_DATE / CURRENT_TIME and let
users
pick
LOCALDATE /
LOCALTIME for materialized timestamp parts

- CURRENT_TIMESTAMP should return a TIMESTAMP
WITH
TIME
ZONE to
materialize all session time information into
every
record.
It it
the
most
generic data type and allows to cast to all
other
timestamp
data
types.
This generic ability can be used for filter
predicates
as
well
either
through implicit or explicit casting.

PROCTIME/ROWTIME should be time functions
based
on
a
long
value.
Both
System.currentMillis() and our watermark
system
work
on
long
values.
Those
should return TIMESTAMP WITH LOCAL TIME ZONE
because
the
main
calculation
should always happen based on UTC. We
discussed
it
in
a
different
thread,
but we should allow PROCTIME globally. People
need a
way to
create
instances of TIMESTAMP WITH LOCAL TIME ZONE.
This
is
not
considered
in the
current design doc. Many pipelines contain UTC
timestamps
and thus
it
should be easy to create one. Also, both
CURRENT_TIMESTAMP
and
LOCALTIMESTAMP can work with this type because
we
should
remember
that
TIMESTAMP WITH LOCAL TIME ZONE accepts all
timestamp
data
types as
casting
target [1]. We could allow TIMESTAMP WITH TIME
ZONE
in
the
future
for
ROWTIME.

In any case, windows should simply adapt
their
behavior to
the
passed
timestamp type. And with TIMESTAMP WITH LOCAL
TIME
ZONE
a
day is
defined by
considering the current session time zone.

If we would like to design this with less
effort
required,
we
could
think about returning TIMESTAMP WITH LOCAL
TIME
ZONE
also
for
CURRENT_TIMESTAMP.


I will try to involve more people into this
discussion.

Thanks,
Timo

[1]











https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-E7CA339A-2093-4FE4-A36E-1D09593591D3
<











https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-E7CA339A-2093-4FE4-A36E-1D09593591D3




2021-01-21,22:32,Leonard Xu <
xbjt...@gmail.com
Before the changes, as I am writing this
reply,
the
local
time
here
is
2021-01-21 12:03:35 (Beijing time, UTC+8).
And I tried these 5 functions in sql client,
and
got:

Flink SQL> select now(), PROCTIME(),
CURRENT_TIMESTAMP,
CURRENT_DATE,
CURRENT_TIME;












+-------------------------+-------------------------+-------------------------+--------------+--------------+
|                  EXPR$0 |
EXPR$1
|
CURRENT_TIMESTAMP | CURRENT_DATE |
CURRENT_TIME
|












+-------------------------+-------------------------+-------------------------+--------------+--------------+
| 2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228
|
2021-01-21T04:03:35.228 |   2021-01-21 |
04:03:35.228
|












+-------------------------+-------------------------+-------------------------+--------------+--------------+
After the changes, the expected behavior
will
change
to:

Flink SQL> select now(), PROCTIME(),
CURRENT_TIMESTAMP,
CURRENT_DATE,
CURRENT_TIME;












+-------------------------+-------------------------+-------------------------+--------------+--------------+
|                  EXPR$0 |
EXPR$1
|
CURRENT_TIMESTAMP | CURRENT_DATE |
CURRENT_TIME
|












+-------------------------+-------------------------+-------------------------+--------------+--------------+
| 2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228
|
2021-01-21T12:03:35.228 |   2021-01-21 |
12:03:35.228
|












+-------------------------+-------------------------+-------------------------+--------------+--------------+
The return type of now(), proctime() and
CURRENT_TIMESTAMP still
be
TIMESTAMP;

To Kurt, thanks  for the intuitive case, it
really
clear,
you’re
wright
that I want to propose to change the return
value
of
these
functions.
It’s
the most important part of the topic from
user's
perspective.

I think this definitely deserves a FLIP.
To Jark,  nice suggestion, I prepared a FLIP
for
this
topic, and
will
start the FLIP discussion soon.

If use the default Flink SQL,&nbsp; the
window
time
range of
the
statistics is incorrect, then the
statistical
results
will
naturally
be
incorrect.
To zhisheng, sorry to hear that this problem
influenced
your
production
jobs,  Could you share your SQL pattern?  we
can
have
more
inputs
and
try
to resolve them.


Best,
Leonard



2021-01-21,14:19,Jark Wu <imj...@gmail.com>

Great examples to understand the problem and
the
proposed
changes,
@Kurt!

Thanks Leonard for investigating this
problem.
The time-zone problems around time functions
and
windows
have
bothered a
lot of users. It's time to fix them!

The return value changes sound reasonable to
me,
and
keeping the
return
type unchanged will minimize the surprise to
the
users.
Besides that, I think it would be better to
mention
how
this
affects
the
window behaviors, and the interoperability
with
DataStream.

I think this definitely deserves a FLIP.


====================================================

Hi zhisheng,

Do you have examples to illustrate which case
will
get
the
wrong
window
boundaries?
That will help to verify whether the proposed
changes
can
solve
your
problem.

Best,
Jark




2021-01-21,12:54,zhisheng <173855...@qq.com>

Thanks to Leonard Xu for discussing this
tricky
topic.
At
present,
there are many Flink jobs in our production
environment
that are
used
to
count day-level reports (eg: count PV/UV
).&nbsp;

If use the default Flink SQL,&nbsp; the
window
time
range
of the
statistics is incorrect, then the statistical
results
will
naturally
be
incorrect.&nbsp;

The user needs to deal with the time zone
manually
in
order to
solve
the problem.&nbsp;

If Flink itself can solve these time zone
issues,
then
I
think it
will
be user-friendly.

Thank you

Best!;
zhisheng




2021-01-21,12:11,Kurt Young <
ykt...@gmail.com>

cc this to user & user-zh mailing list
because
this
will
affect
lots
of
users, and also quite a lot of users
were asking questions around this topic.

Let me try to understand this from user's
perspective.

Your proposal will affect five functions,
which
are:
PROCTIME()
NOW()
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
Before the changes, as I am writing this
reply,
the
local
time
here
is
2021-01-21 12:03:35 (Beijing time, UTC+8).
And I tried these 5 functions in sql client,
and
got:

Flink SQL> select now(), PROCTIME(),
CURRENT_TIMESTAMP,
CURRENT_DATE,
CURRENT_TIME;












+-------------------------+-------------------------+-------------------------+--------------+--------------+
|                  EXPR$0 |
EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE |
CURRENT_TIME
|












+-------------------------+-------------------------+-------------------------+--------------+--------------+
| 2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228 |   2021-01-21 |
04:03:35.228
|












+-------------------------+-------------------------+-------------------------+--------------+--------------+
After the changes, the expected behavior will
change
to:

Flink SQL> select now(), PROCTIME(),
CURRENT_TIMESTAMP,
CURRENT_DATE,
CURRENT_TIME;












+-------------------------+-------------------------+-------------------------+--------------+--------------+
|                  EXPR$0 |
EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE |
CURRENT_TIME
|












+-------------------------+-------------------------+-------------------------+--------------+--------------+
| 2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228 |   2021-01-21 |
12:03:35.228
|












+-------------------------+-------------------------+-------------------------+--------------+--------------+
The return type of now(), proctime() and
CURRENT_TIMESTAMP
still
be
TIMESTAMP;

Best,
Kurt






















Reply via email to