Hi Yun,
Thank you very much for your valuable input!
Incremental mode is indeed an attractive idea, we have also discussed
this, but in the current design,
we first provided two refresh modes: CONTINUOUS and
FULL. Incremental mode can be introduced
once the execution layer has the capability.
My answer for the two questions:
1.
Yes, cascading is a good question. Current proposal provides a
freshness that defines a dynamic
table relative to the base table’s lag. If users need to consider the
end-to-end freshness of multiple
cascaded dynamic tables, he can manually split them for now. Of
course, how to let multiple cascaded
or dependent dynamic tables complete the freshness definition in a
simpler way, I think it can be
extended in the future.
2.
Cascading refresh is also a part we focus on discussing. In this flip,
we hope to focus as much as
possible on the core features (as it already involves a lot things),
so we did not directly introduce related
syntax. However, based on the current design, combined with the
catalog and lineage, theoretically,
users can also finish the cascading refresh.
Best,
Lincoln Lee
Yun Tang <myas...@live.com> 于2024年3月19日周二 13:45写道:
Hi Lincoln,
Thanks for driving this discussion, and I am so excited to see this topic
being discussed in the Flink community!
From my point of view, instead of the work of unifying streaming and
batch
in DataStream API [1], this FLIP actually could make users benefit from
one
engine to rule batch & streaming.
If we treat this FLIP as an open-source implementation of Snowflake's
dynamic tables [2], we still lack an incremental refresh mode to make the
ETL near real-time with a much cheaper computation cost. However, I think
this could be done under the current design by introducing another
refresh
mode in the future. Although the extra work of incremental view
maintenance
would be much larger.
For the FLIP itself, I have several questions below:
1. It seems this FLIP does not consider the lag of refreshes across ETL
layers from ODS ---> DWD ---> APP [3]. We currently only consider the
scheduler interval, which means we cannot use lag to automatically
schedule
the upfront micro-batch jobs to do the work.
2. To support the automagical refreshes, we should consider the lineage
in
the catalog or somewhere else.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API
[2] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
[3] https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh
Best
Yun Tang
________________________________
From: Lincoln Lee <lincoln.8...@gmail.com>
Sent: Thursday, March 14, 2024 14:35
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for
Simplifying Data Pipelines
Hi Jing,
Thanks for your attention to this flip! I'll try to answer the following
questions.
1. How to define query of dynamic table?
Use flink sql or introducing new syntax?
If use flink sql, how to handle the difference in SQL between streaming
and
batch processing?
For example, a query including window aggregate based on processing
time?
or a query including global order by?
Similar to `CREATE TABLE AS query`, here the `query` also uses Flink sql
and
doesn't introduce a totally new syntax.
We will not change the status respect to
the difference in functionality of flink sql itself on streaming and
batch, for example,
the proctime window agg on streaming and global sort on batch that you
mentioned,
in fact, do not work properly in the
other mode, so when the user modifies the
refresh mode of a dynamic table that is not supported, we will throw an
exception.
2. Whether modify the query of dynamic table is allowed?
Or we could only refresh a dynamic table based on the initial query?
Yes, in the current design, the query definition of the
dynamic table is not allowed
to be modified, and you can only refresh the data based on the
initial definition.
3. How to use dynamic table?
The dynamic table seems to be similar to the materialized view. Will
we
do
something like materialized view rewriting during the optimization?
It's true that dynamic table and materialized view
are similar in some ways, but as Ron
explains
there are differences. In terms of optimization, automated
materialization discovery
similar to that supported by calcite is also a potential possibility,
perhaps with the
addition of automated rewriting in the future.
Best,
Lincoln Lee
Ron liu <ron9....@gmail.com> 于2024年3月14日周四 14:01写道:
Hi, Timo
Sorry for later response, thanks for your feedback.
Regarding your questions:
Flink has introduced the concept of Dynamic Tables many years ago.
How
does the term "Dynamic Table" fit into Flink's regular tables and also
how does it relate to Table API?
I fear that adding the DYNAMIC TABLE keyword could cause confusion
for
users, because a term for regular CREATE TABLE (that can be "kind of
dynamic" as well and is backed by a changelog) is then missing. Also
given that we call our connectors for those tables,
DynamicTableSource
and DynamicTableSink.
In general, I find it contradicting that a TABLE can be "paused" or
"resumed". From an English language perspective, this does sound
incorrect. In my opinion (without much research yet), a continuous
updating trigger should rather be modelled as a CREATE MATERIALIZED
VIEW
(which users are familiar with?) or a new concept such as a CREATE
TASK
(that can be paused and resumed?).
1.
In the current concept[1], it actually includes: Dynamic Tables &
Continuous Query. Dynamic Table is just an abstract
logical concept
, which in its physical form represents either a table or a changelog
stream. It requires the combination with Continuous Query to achieve
dynamic updates of the target table similar to a database’s
Materialized View.
We hope to upgrade the Dynamic Table to a real entity that users can
operate, which combines the logical concepts of Dynamic Tables +
Continuous Query. By integrating the definition of tables and queries,
it can achieve functions similar to Materialized Views, simplifying
users' data processing pipelines.
So, the object of the suspend operation is the refresh task of the
dynamic table. The command `ALTER DYNAMIC TABLE table_name SUSPEND `
is actually a shorthand for `ALTER DYNAMIC TABLE table_name SUSPEND
REFRESH` (if written in full for clarity, we can also modify it).
2. Initially, we also considered Materialized Views
, but ultimately decided against them. Materialized views are designed
to enhance query performance for workloads that consist of common,
repetitive query patterns. In essence, a materialized view represents
the result of a query.
However, it is not intended to support data modification. For
Lakehouse scenarios, where the ability to delete or update data is
crucial (such as compliance with GDPR, FLIP-2), materialized views
fall short.
3.
Compared to CREATE (regular) TABLE, CREATE DYNAMIC TABLE not only
defines metadata in the catalog but also automatically initiates a
data refresh task based on the query specified during table creation.
It dynamically executes data updates. Users can focus on data
dependencies and data generation logic.
4.
The new dynamic table does not conflict with the existing
DynamicTableSource and DynamicTableSink interfaces. For the developer,
all that needs to be implemented is the new CatalogDynamicTable,
without changing the implementation of source and sink.
5. For now, the FLIP does not consider supporting Table API operations
on
Dynamic Table
. However, once the SQL syntax is finalized, we can discuss this in a
separate FLIP. Currently, I have a rough idea: the Table API should
also introduce
DynamicTable operation interfaces
corresponding to the existing Table interfaces.
The TableEnvironment
will provide relevant methods to support various dynamic table
operations. The goal for the new Dynamic Table is to offer users an
experience similar to using a database, which is why we prioritize
SQL-based approaches initially.
How do you envision re-adding the functionality of a statement set,
that
fans out to multiple tables? This is a very important use case for
data
pipelines.
Multi-tables is indeed a very important user scenario. In the future,
we can consider extending the statement set syntax to support the
creation of multiple dynamic tables.
Since the early days of Flink SQL, we were discussing `SELECT
STREAM
*
FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and
EMIT,
into other keywords DYNAMIC TABLE and FRESHNESS. But the core
functionality is still there. I'm wondering if we should widen the
scope
(maybe not part of this FLIP but a new FLIP) to follow the standard
more
closely. Making `SELECT * FROM t` bounded by default and use new
syntax
for the dynamic behavior. Flink 2.0 would be the perfect time for
this,
however, it would require careful discussions. What do you think?
The query part indeed requires a separate FLIP
for discussion, as it involves changes to the default behavior.
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables
Best,
Ron
Jing Zhang <beyond1...@gmail.com> 于2024年3月13日周三 15:19写道:
Hi, Lincoln & Ron,
Thanks for the proposal.
I agree with the question raised by Timo.
Besides, I have some other questions.
1. How to define query of dynamic table?
Use flink sql or introducing new syntax?
If use flink sql, how to handle the difference in SQL between
streaming
and
batch processing?
For example, a query including window aggregate based on processing
time?
or a query including global order by?
2. Whether modify the query of dynamic table is allowed?
Or we could only refresh a dynamic table based on initial query?
3. How to use dynamic table?
The dynamic table seems to be similar with materialized view. Will
we
do
something like materialized view rewriting during the optimization?
Best,
Jing Zhang
Timo Walther <twal...@apache.org> 于2024年3月13日周三 01:24写道:
Hi Lincoln & Ron,
thanks for proposing this FLIP. I think a design similar to what
you
propose has been in the heads of many people, however, I'm
wondering
how
this will fit into the bigger picture.
I haven't deeply reviewed the FLIP yet, but would like to ask some
initial questions:
Flink has introduced the concept of Dynamic Tables many years ago.
How
does the term "Dynamic Table" fit into Flink's regular tables and
also
how does it relate to Table API?
I fear that adding the DYNAMIC TABLE keyword could cause confusion
for
users, because a term for regular CREATE TABLE (that can be "kind
of
dynamic" as well and is backed by a changelog) is then missing.
Also
given that we call our connectors for those tables,
DynamicTableSource
and DynamicTableSink.
In general, I find it contradicting that a TABLE can be "paused" or
"resumed". From an English language perspective, this does sound
incorrect. In my opinion (without much research yet), a continuous
updating trigger should rather be modelled as a CREATE MATERIALIZED
VIEW
(which users are familiar with?) or a new concept such as a CREATE
TASK
(that can be paused and resumed?).
How do you envision re-adding the functionality of a statement set,
that
fans out to multiple tables? This is a very important use case for
data
pipelines.
Since the early days of Flink SQL, we were discussing `SELECT
STREAM
*
FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and
EMIT,
into other keywords DYNAMIC TABLE and FRESHNESS. But the core
functionality is still there. I'm wondering if we should widen the
scope
(maybe not part of this FLIP but a new FLIP) to follow the standard
more
closely. Making `SELECT * FROM t` bounded by default and use new
syntax
for the dynamic behavior. Flink 2.0 would be the perfect time for
this,
however, it would require careful discussions. What do you think?
Regards,
Timo
On 11.03.24 08:23, Ron liu wrote:
Hi, Dev
Lincoln Lee and I would like to start a discussion about
FLIP-435:
Introduce a New Dynamic Table for Simplifying Data Pipelines.
This FLIP is designed to simplify the development of data
processing
pipelines. With Dynamic Tables with uniform SQL statements and
freshness, users can define batch and streaming transformations
to
data in the same way, accelerate ETL pipeline development, and
manage
task scheduling automatically.
For more details, see FLIP-435 [1]. Looking forward to your
feedback.
[1]
Best,
Lincoln & Ron