> I was wondering why you, exceptionally, suggested 'scan.idle-timeout' instead of 'scan.watermark.idle-timeout'. I must miss something here.

@Jing: You are right. That was just a copy paste mistake. It should be `scan.watermark.idle-timeout`.

@Kui: Can you fix that in the FLIP? Sorry, for this typo.

> users might feel confused after using those OPTIONs, since they might not be aware of what happens underneath

I agree that those options will not make the life easier for most SQL users. I would consider those options intended for power users. Usually, the data platform team should come up with well-defined watermark semantics before exposing tables to SQL users.

Regards,
Timo


On 07.03.23 13:15, Kui Yuan wrote:
Hi Jing,

Thanks for your advice. In upcoming PR, I will explain all these things
(Including flip-182,flip-217,etc.) clearly in the flink doc to make sure
the user can understand the behavior behind it.

Best,
Kui

Jing Ge <j...@ververica.com.invalid> 于2023年3月7日周二 19:42写道:

Hi Kui,

Thanks for adding it into the Flip. There is one more thing wrt this topic
you might want to pay attention to, a little bit off-topic, is that Flink
SQL users might not be familiar with use cases of low level Datastream API.
IMHO, it is highly recommended(mandatory) to write the dependency you just
described in your last email and all related information in Flink doc
during developing this FLIP within the upcoming PR. Without those
guidelines in doc, users might feel confused after using those OPTIONs,
since they might not be aware of what happens underneath, and therefore
don't know why it does not work even if they did everything right at Flink
SQL level.

Best regards,
Jing


On Tue, Mar 7, 2023 at 6:36 AM Kui Yuan <catye...@gmail.com> wrote:

Hi Jing,


Thanks for the reminder. The aim of this flip is letting the sql users to
use those features in the Datastream API, we don't intend to extend
flip-217. In my opinion, the watermark alignment with only one source can
be configured by the options given in flip, and if the source connector
does not implement flip-217, the task will run with an error, reminding
the
user to use `pipeline.watermark-alignment.allow-
unaligned-source-splits`,
but maybe these behaviors are not understood by everyone, I will add some
tips about flip-217 in the flip to let users understand the behavior in
the
case of source splits.


Best,

Kui Yuan

Jing Ge <j...@ververica.com.invalid> 于2023年3月7日周二 04:23写道:

Hi Kui,

Thanks for pointing that out. I knew FLIP-217 which was done by an
engineer working in my team.  As far as I am concerned, your FLIP
should
answer the following questions:

1. How to enable the watermark alignment of a source splits with Flink
SQL?
e.g. which options should be used if only one source is used?

2. Default behaviour. i.e. Flink SQL users should be aware that
watermark
alignment of source split will only work for sources that implement
FLIP-217 properly. Should users take care of
`pipeline.watermark-alignment.allow-unaligned-source-splits`
while using Flink SQL?

Best regards,
Jing


On Fri, Mar 3, 2023 at 8:46 AM Kui Yuan <catye...@gmail.com> wrote:

Hi all,

Thanks for all. There are more questions and I will answer one by
one.

@Jark Thanks for your tips. For the first question, I will add more
details
in the flip, and give a POC[1] so that pepole can know how I'm
currently
implementing these features.

IIRC, this is the first time we introduce the framework-level
connector
options that the option is not recognized and handled by
connectors.
The FLIP should cover how framework filters the watermark related
options
to avoid discover connector factory failed, and what happens if the
connector already supported the conflict options

For the second question, We know that the default strategy is
'on-periodic'
in SQL layer, and the default interval is 200ms. The reason for
emiting
watermark periodically is that the time advancement of consecutive
events
may be very small, we don't need to calculate watermark for each
event.
Same for 'on-event' strategy, so my idea is that we can set a fixed
gap
for
'on-event' strategy.

I'm not sure about the usage scenarios of event gap emit strategy.
Do
you have any specific use case of this strategy? I'm confused why
no
one
requested this strategy before no matter in DataStream or SQL, but
maybe
I missed something. I'm not against to add this option, but just
want
to
be
careful when adding new API because it's hard to remove in the
future.

As @Timo said, There is no default features like 'on-event-gap' in
DataStream API, but the users can achieve the 'on-event-gap' feature
by
using `WatermarkGenerator` interface, just like the implemention in
my
POC[1]. However, If we don't provide it  in SQL layer, there is no
way
for
users to use similar features.

Jark raised a very good point. I thought we only expose what is
contained in DataStream API already. If this strategy is not part
of
DataStream API, would like to exclude it from the FLIP. We need to
be
careful which strategies we offer by default.

@Jark @Timo I'm sorry, perhaps I don't understand what are your
concerns
about CompiledPlan, maybe I missed something else, maybe you can look
at
my
POC first to see if there is somewhere to worry about.

Sorry, I forgot to remind you that Timo's concern about the changes
to
the
CompiledPlan looks like is still not covered in the FLIP.

@Jing We could have more discussion about naming, but I prefer that
the
naming should be consistent with the DataStream API.
About aligning splits/partitions/shards, maybe you missed FLIP-217[2]
which
aims to support watermark alignment of source splits.

After reading the most up-to-date Flip, I didn't find any
information
if
this solution will support aligning splits/partitions/shards [1].
Did I
miss anything?

Best
Kui Yuan

[1] the POC:
https://github.com/yuchengxin/flink/tree/yuankui/watermark_params
[2] FLIP-217:




https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits


Jing Ge <j...@ververica.com.invalid> 于2023年3月3日周五 08:03写道:

Hi,

Thanks Kui for driving this Flip and thanks all for the informative
discussion.

@Timo

Your suggestion about the naming convention is excellent. Thanks! I
was
wondering why you, exceptionally, suggested 'scan.idle-timeout'
instead
of
'scan.watermark.idle-timeout'. I must miss something here.

There is one more NIT. I am just aware that "drift" is used for the
watermark alignment. It seems to be fine while using DataStream
API,
because we will not really see it. But with the OPTIONS in SQL, a
much
bigger group of users (including SRE, tech support, etc) will see
the
word
"drift". Given that "drift" wasn't used widely yet and with all
training
materials, Flink doc [1][2][3] (search with "lag"), "lag" has been
used
to
describe timestamp difference between watermark and its
corresponding event. Do we really need to introduce another term
for
the
same thing? How about using
'scan.watermark.alignment.max-lag'='1min'
and
change the parameter name from maxAllowedWatermarkDrift to
maxAllowedWatermarkLag [4] because of naming consistency? Just my
two
cents
worth.

@Kui

After reading the most up-to-date Flip, I didn't find any
information
if
this solution will support aligning splits/partitions/shards [1].
Did I
miss anything?

+1 for the concern about Table API. We'd be better keep Table API
and
SQL
synced for new features.

Best regards,
Jing


[1]





https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_
[2]





https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/event-time/built_in/#fixed-amount-of-lateness

[3]





https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka/
[4]





https://github.com/apache/flink/blob/4aacff572a9e3996c5dee9273638831e4040c767/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L169



On Wed, Mar 1, 2023 at 3:54 PM Timo Walther <twal...@apache.org>
wrote:

Reg. 2:
  > event gap emit strategy [...] no matter in DataStream or SQL

Jark raised a very good point. I thought we only expose what is
contained in DataStream API already. If this strategy is not part
of
DataStream API, would like to exclude it from the FLIP. We need
to
be
careful which strategies we offer by default.

Reg 1:
This already has a JIRA ticket with additional thoughts on this
topic:
https://issues.apache.org/jira/browse/FLINK-25221

Regards,
Timo



On 01.03.23 12:31, Jark Wu wrote:
Sorry, I forgot to remind you that Timo's concern about the
changes
to
the
CompiledPlan looks like is still not covered in the FLIP.

Best,
Jark

On Wed, 1 Mar 2023 at 19:28, Jark Wu <imj...@gmail.com> wrote:

Hi Kui,

Thank you for the great proposal, I think this is already in a
good
shape.

Just a kind reminder, according to the community
guidelines[1],
if there are unresponsive reviewers, a typical reasonable time
to wait for responses is one week, but be pragmatic about it.

Regarding the FLIP, I have some comments below:

1. IIRC, this is the first time we introduce the
framework-level
connector
options that the option is not recognized and handled by
connectors.
The FLIP should cover how framework filters the watermark
related
options
to avoid discover connector factory failed, and what happens
if
the
connector
already supported the conflict options.

2. I'm not sure about the usage scenarios of event gap emit
strategy.
Do
you have any specific use case of this strategy? I'm confused
why
no
one
requested this strategy before no matter in DataStream or SQL,
but
maybe
I missed something. I'm not against to add this option, but
just
want
to
be
careful when adding new API because it's hard to remove in the
future.


3. Adding a "Public Interface"[2] section to summarize the
proposed APIs and options would be better for developers to
know the impact. Currently, the APIs are scattered in the long
design sections.

Best,
Jark


[1]:






https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template

On Wed, 1 Mar 2023 at 16:56, Kui Yuan <catye...@gmail.com>
wrote:

Hi all,

Thanks for all discussions!

Anyone else have questions or suggestions? if not, I will
start a
vote
thread later.

Best
Kui Yuan

kui yuan <catye...@gmail.com> 于2023年2月27日周一 20:21写道:

Hi Timo,

Thanks for your advice. I totally agree with your suggestion
of
naming
convention, I will rename these options and update the flip
later,
thanks
very much.

In our internal implementation we had put these options
inside
the
`FactoryUtil`, just as you expect.  We have also taken into
account
the
changes to the CompiledPlan and we have packaged these
options
appropriately to minimize intrusiveness and ensure the
compatibility
to
the
`WatermarkPushDownSpec`.

A hint to the implementation: I would suggest that we add
those
options
to `FactoryUtil`. All cross-connector options should end up
there.


Please also consider the changes to the CompiledPlan in
your
FLIP.
This
change has implications on the JSON format as watermark
strategy
of
ExecNode becomes more complex, see WatermarkPushDownSpec

Best
Kui Yuan

Timo Walther <twal...@apache.org> 于2023年2月27日周一 18:05写道:

Hi Kui Yuan,

thanks for working on this FLIP. Let me also give some
comments
about
the proposed changes.

I support the direction of this FLIP about handling these
watermark-specific properties through options and
/*+OPTIONS(...)
*/
hints.

Regarding naming, I would like to keep the options in sync
with
existing
options:

   > 'watermark.emit.strategy'='ON_EVENT'

Let's use lower case (e.g. `on-event`) that matches with
properties
like
sink.partitioner [1] or sink.delivery-guarantee [2].

   > 'source.idle-timeout'='1min'

According to FLIP-122 [3], we want to prefix all
scan-source
related
properties with `scan.*`. This clearly includes
idle-timeout
and
actually also watermark strategies which don't apply for
lookup
sources.

Summarizing the comments above, we should use the following
options:

'scan.watermark.emit.strategy'='on-event',
'scan.watermark.emit.on-event.gap'='10000',
'scan.idle-timeout'='1min',
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s'

I know that this makes the keys even longer, but given that
those
options are for power users this should be acceptable. It
also
clearly
indicates which options are for sinks, scans, and lookups.
This
potentially also helps in allow lists.

A hint to the implementation: I would suggest that we add
those
options
to `FactoryUtil`. All cross-connector options should end up
there.

Please also consider the changes to the CompiledPlan in
your
FLIP.
This
change has implications on the JSON format as watermark
strategy
of
ExecNode becomes more complex, see WatermarkPushDownSpec
[4].

Regards,
Timo


[1]








https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#sink-partitioner
[2]








https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/#sink-delivery-guarantee
[3]








https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[4]








https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java


On 24.02.23 04:55, kui yuan wrote:
Hi all,

I have updated the flip according to the discussion, and
we
will
extend
the
watermark-related features with both table options and
'OPTIONS'
hint,
like
this:

```
-- configure in table options
CREATE TABLE user_actions (
     ...
     user_action_time TIMESTAMP(3),
     WATERMARK FOR user_action_time AS user_action_time -
INTERVAL
'5'
SECOND
) WITH (
     'watermark.emit.strategy'='ON_PERIODIC',
     ...
);

-- use 'OPTIONS' hint
select ... from source_table /*+
OPTIONS('watermark.emit.strategy'=
'ON_PERIODIC') */
```

Does everybody have any other questions?

Best
Kui Yuan

kui yuan <catye...@gmail.com> 于2023年2月23日周四 20:05写道:

Hi all,

Thanks for all suggestions.

We will extend the watermark-related features in SQL
layer
with
dynamic
table options and 'OPTIONS' hint, just as everyone
expects. I
will
modify
Flip-296 as discussed.

@Martijn As far as I know, there is no hint interface in
the
table
API,
so we can't use hint in table API directly. if we need to
extend
the
hint
interface in the table API, maybe we need another flip.
However,
if
we
extend the watermark-related features in the dynamic
table
options,
maybe
we are able to use them indirectly in the table API like
this[1]:

```
// register a table named "Orders"
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT,
product
STRING,
amount INT) WITH
('watermark.emit.strategy'='ON_EVENT'...)");
```

[1]








https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/

Best
Kui Yuan

Yun Tang <myas...@live.com> 于2023年2月23日周四 17:46写道:

Thanks for the warm discussions!

I had an offline discussion with Kui about the replies.
I
think
I
could
give some explanations on the original intention to
introduce
another
WATERMARK_PARAMS. If we take a look at the current
datastream
API,
the
watermark strategy does not belong to any specific
connector.
And
we
thought the dynamic table options were more like the
configurations
within
some specific connector.

   From the review comments, I think most people feel
good
to
make
it
part
of the dynamic table options. I think this is fine if we
give
more
clear
scope definition of the dynamic table options here. And
I
also
agree
with
Jingsong's concern about adding SQL syntax which is the
most
concerning
part before launching this discussion.

For Martijn's concern, if we accept to make the
watermark-related
options
part of dynamic table options, the problem becomes
another
topic:
how
to
support the dynamic table options in table API, which is
deserved
to
create
another FLIP.

Best
Yun Tang
________________________________
From: Martijn Visser <martijnvis...@apache.org>
Sent: Thursday, February 23, 2023 17:14
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-296: Watermark options for
table
API &
SQL

Hi,

While I can understand that there's a desire to first
focus
on
solving
this
problem for SQL, I do wonder if we should ignore the
Table
API
at
this
point. If we could include the syntax for the Table API,
it
potentially
could also be implemented by another contributor without
needing
to
create
another FLIP. If we don't design it right now, my
concern
is
that
this
will
increase sparsity for the Table API which ultimately
hurts
adoption.

With regards to the syntax, I have a preference to solve
this
via
the
connector options (e.g. like you can currently specify
things
as
scan.startup.specific-offsets or scan.bounded.mode for
the
Kafka
connector). You could still use the dynamic table
options
to
override/add
them.

Best regards,

Martijn

On Thu, Feb 23, 2023 at 7:21 AM Shammon FY <
zjur...@gmail.com

wrote:

Hi kui

Thanks for your answer and +1 to yuxia too

we should not bind the watermark-related options to a
connector
to
ensure
semantic clarity.

In my opinion, adding watermark-related options to a
connector
is
much
more
clear. Currently users can define simple watermark
strategy
in
DDL,
adding
more configuration items in connector options is easy
to
understand

Best,
Shammon


On Thu, Feb 23, 2023 at 10:52 AM Jingsong Li <
jingsongl...@gmail.com

wrote:

Thanks for your proposal.

+1 to yuxia, consider watermark-related hints as
option
hints.

Personally, I am cautious about adding SQL syntax,
WATERMARK_PARAMS
is
also SQL syntax to some extent.

We can use OPTIONS to meet this requirement if
possible.

Best,
Jingsong

On Thu, Feb 23, 2023 at 10:41 AM yuxia <
luoyu...@alumni.sjtu.edu.cn

wrote:

Hi, Yuan Kui.
Thanks for driving it.
IMO, the 'OPTIONS' hint may be not only specific to
the
connector
options. Just as a reference, we also have
`sink.parallelism`[1]
as
a
connector options. It enables
user to specific the writer's parallelism dynamically
per-query.

Personally, I perfer to consider watermark-related
hints
as
option
hints. So, user can define a default watermark
strategy
for
the
table,
and
if user dosen't needed to changes it, they need to do
nothing
in
their
query instead of specific it ervery time.

[1]










https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/filesystem/#sink-parallelism

Best regards,
Yuxia

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "kui yuan" <catye...@gmail.com>
收件人: "dev" <dev@flink.apache.org>
抄送: "Jark Wu" <imj...@gmail.com>
发送时间: 星期三, 2023年 2 月 22日 下午 10:08:11
主题: Re: [DISCUSS] FLIP-296: Watermark options for
table
API &
SQL

Hi all,

Thanks for the lively discussion and I will respond
to
these
questions
one
by one. However, there are also some common questions
and I
will
answer
together.

@郑 Thanks for your reply. The features mentioned in
this
flip
are
only
for
those source connectors that implement the
SupportsWatermarkPushDown
interface, generating watermarks in other graph
locations
is
not in
the
scope of this discussion. Perhaps another flip can be
proposed
later to
implement this feature.

@Shammon Thanks for your reply. In Flip-296, a
rejected
alternative
is
adding watermark related options in the connector
options,we
believe
that
we should not bind the watermark-related options to a
connector
to
ensure
semantic clarity.

What will happen if we add watermark related options
in
`the
connector
options`? Will the connector ignore these options or
throw
an
exception?
How can we support this?

If user defines different watermark configurations
for
one
table in
two
places, I tend to prefer the first place would
prevail,
but
we
can
also
throw exception or just print logs to prompt the
user,
which
are
implementation details.

If one table is used by two operators with different
watermark
params,
what will happen?

@Martijn Thanks for your reply.  I'm sorry that we
are
not
particularly
accurate, this hint is mainly for SQL,  not table
API.

While the FLIP talks about watermark options for
Table
API &
SQL,
I
only
see proposed syntax for SQL, not for the Table API.
What
is
your
proposal
for the Table API

@Jane Thanks for your reply. For the first question,
If
the
user
uses
this
hint on those sourse that does not implement the
SupportsWatermarkPushDown
interface, it will be completely invalid. The task
will
run
as
normal
as
if
the hint had not been used.

What's the behavior if there are multiple table
sources,
among
which
some do not support `SupportsWatermarkPushDown`?

@Jane feedback that 'WATERMARK_PARAMS' is difficult
to
remember,
perhaps
the naming issue can be put to the end of the
discussion,
because
more
people like @Martijn @Shuo are considering whether
these
configurations
should be put into the DDL or the 'OPTIONS' hint.
Here's
what I
think, Putting these configs into DDL or putting them
into
'OPTIONS'
hint
is actually the same thing, because the 'OPTIONS'
hint
is
mainly
used
to
configure the properties of conenctor. The reason
why I
want
to
use
a
new
hint is to make sure the semantics clear, in my
opinion
the
configuration
of watermark should not be mixed up with connector.
However,
a
new
hint
does make it more difficult to use to some extent,
for
example,
when a
user
uses both 'OPTIONS' hint and 'WATERMARK_PARAMS' hint.
For
this
point,
maby
it is more appropriate to use uniform 'OPTIONS' hint.
On the other hand, if we enrich more watermark option
keys
in
'OPTIONS'
hints, The question will be what we treat the
definatrions
of
'OPTIONS'
hint, is this only specific to the connector options
or
could
be
more?
Maybe @Jark could share more insights here. In my
opion,
'OPTIONS'
is
only
related to the connector options, which is not like
the
gernal
watermark
options.



Shuo Cheng <njucs...@gmail.com> 于2023年2月22日周三
19:17写道:

Hi Kui,

Thanks for driving the discussion. It's quite useful
to
introduce
Watermark
options. I have some questions:

What kind of hints is "WATERMARK_PARAMS"?
Currently, we have two kinds of hints in Flink:
Dynamic
Table
Options &
Query Hints. As described in the Flip,
"WATERMARK_PARAMS"
is
more
like
Dynamic Table Options. So two questions arise here:

1)  Are these watermark options to be exposed as
connector
WITH
options? Aa
described in SQL Hints doc[1],  "Dynamic Table
Options
allow
to
specify or
override table options dynamically", which implies
that
these
options
can
also be configured in WITH options.

2)  Do we really need a new hint name like
'WATERMARK_PARAMS',
table
options use "OPTIONS" as hint name, like '/*+
OPTIONS('csv.ignore-parse-errors'='true') */', maybe
we
can
enrich
more
table option keys for watermark, e.g., /*+
OPTIONS('watermark.emit-strategy'='ON_PERIODIC') */.


[1]












https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/

On Wed, Feb 22, 2023 at 10:22 AM kui yuan <
catye...@gmail.com

wrote:

Hi devs,


I'd like to start a discussion thread for
FLIP-296[1].
This
comes
from an
offline discussion with @Yun Tang, and we hope to
enrich
table
API
&
SQL
to
support many watermark-related features which were
only
implemented
at
the
datastream API level.


Basically, we want to introduce watermark options
in
table
API &
SQL
via
SQL hint named 'WATERMARK_PARAMS' to support
features:

1、Configurable watermark emit strategy

2、Dealing with idle sources

3、Watermark alignment


Last but not least, thanks to Qingsheng and Jing
Zhang
for
the
initial
reviews.


Looking forward to your thoughts and any feedback
is
appreciated!


[1]












https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240884405


Best

Yuan Kui






















Reply via email to