Hi Mang,

Thanks for proposing this, CTAS is a very important API for batch users.

I think the key problem of this FLIP is the ACID semantics of the CTAS
operation.
We care most about two parts of the semantics:
1) Atomicity: the created table should be rolled back if the write is
failed.
2) Isolation: the created table shouldn't be visible before the write is
successful (read uncommitted).

>From your investigation, it seems that:
- Flink (your FLIP): none of them.   ==> LEVEL-1
- Spark DataSource v1: is atomic (can roll back), but is not isolated. ==>
LEVEL-2
- Spark DataSource v2: guarantees both of them.  ==> LEVEL-3
- Hive MR: guarantees both of them. ==> LEVEL-3

In order to support higher ACID semantics, I agree with Godfrey that we
need some hooks in JM
which can be called when the job is finished or failed/canceled. It might
look like
`StreamExecutionEnvironment#registerJobListener(JobListener)`,
but JobListener is called on the
client side. What we need is an interface called on the JM side, because
the job can be submitted in
detached mode.

With this interface, we can easily support LEVEL-2 semantics by calling
`Catalog#dropTable` in the
`JobListener#onJobFailed`. We can also support LEVEL-3 by introducing
`StagingTableCatalog` like Spark,
calling `StagedTable#commitStagedChanges()` in `JobListener#onJobFinished`
and
calling StagedTable#abortStagedChanges() in `JobListener#onJobFailed`.

Best,
Jark


On Wed, 18 May 2022 at 12:29, godfrey he <godfre...@gmail.com> wrote:

> Hi Mang,
>
> Thanks for driving this FLIP.
>
> Please follow the FLIP template[1] style, and the `Syntax ` is part of
> the `Public API Changes` section.
> ‘Program research’ and 'Implementation Plan' are part of the `Proposed
> Changes` section,
> or move ‘Program research’ to the appendix.
>
> > Providing methods that are used to execute CTAS for Table API users.
> We should introduce `createTable` in `Table` instead of `TableEnvironment`.
> Because all table operations are defined in `Table`, see:
> Table#executeInsert,
> Table#insertInto, etc.
> About the method name, I prefer to use `createTableAs`.
>
> > TableSink needs to provide the CleanUp API, developers implement as
> needed.
> I think it's hard for TableSink to implement a clean up operation. For
> file system sink,
> the data can be written to a temporary directory, but for key/value
> sinks, it's hard to
> remove the written keys, unless the sink records all written keys.
>
> > Do not do drop table operations in the framework, drop table is
> implemented in
> TableSink according to the needs of specific TableSink
> The TM process may crash at any time, and the drop operation will not
> be executed any more.
>
> How about we do the drop table operation and cleanup data action in the
> catalog?
> Where to execute the drop operation. one approach is in client, other is
> in JM.
> 1. in client: this requires the client to be alive until the job is
> finished and failed.
> 2. in JM: this requires the JM could provide some interfaces/hooks
> that the planner
> implements the logic and the code will be executed in JM.
> I prefer the approach two, but it requires more detail design with
> runtime @gaoyunhaii, @kevin.yingjie
>
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>
> Best,
> Godfrey
>
>
> Mang Zhang <zhangma...@163.com> 于2022年5月6日周五 11:24写道:
>
> >
> > Hi, Yuxia
> > Thanks for your reply!
> > About the question 1, we will not support, FLIP-218[1] is to simplify
> the complexity of user DDL and make it easier for users to use. I have
> never encountered this case in a big data.
> > About the question 2, we will provide a public API like below public
> void cleanUp();
> >
> >       Regarding the mechanism of cleanUp, people who are familiar with
> the runtime module need to provide professional advice, which is what we
> need to focus on.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best regards,
> > Mang Zhang
> >
> >
> >
> >
> >
> > At 2022-04-29 17:00:03, "yuxia" <luoyu...@alumni.sjtu.edu.cn> wrote:
> > >Thanks for for driving this work, it's to be a useful feature.
> > >About the flip-218, I have some questions.
> > >
> > >1: Does our CTAS syntax support specify target table's schema including
> column name and data type? I think it maybe a useful fature in case we want
> to change the data types in target table instead of always copy the source
> table's schema. It'll be more flexible with this feature.
> > >Btw, MySQL's "CREATE TABLE ... SELECT Statement"[1] support this
> feature.
> > >
> > >2: Seems it'll requre sink to implement an public interface to drop
> table, so what's the interface will look like?
> > >
> > >[1] https://dev.mysql.com/doc/refman/8.0/en/create-table-select.html
> > >
> > >Best regards,
> > >Yuxia
> > >
> > >----- 原始邮件 -----
> > >发件人: "Mang Zhang" <zhangma...@163.com>
> > >收件人: "dev" <dev@flink.apache.org>
> > >发送时间: 星期四, 2022年 4 月 28日 下午 4:57:24
> > >主题: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS)
> > >
> > >Hi, everyone
> > >
> > >
> > >I would like to open a discussion for support select clause in CREATE
> TABLE(CTAS),
> > >With the development of business and the enhancement of flink sql
> capabilities, queries become more and more complex.
> > >Now the user needs to use the Create Table statement to create the
> target table first, and then execute the insert statement.
> > >However, the target table may have many columns, which will bring a lot
> of work outside the business logic to the user.
> > >At the same time, ensure that the schema of the created target table is
> consistent with the schema of the query result.
> > >Using a CTAS syntax like Hive/Spark can greatly facilitate the user.
> > >
> > >
> > >
> > >You can find more details in FLIP-218[1]. Looking forward to your
> feedback.
> > >
> > >
> > >
> > >[1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-218%3A+Support+SELECT+clause+in+CREATE+TABLE(CTAS)
> > >
> > >
> > >
> > >
> > >--
> > >
> > >Best regards,
> > >Mang Zhang
> >
> >
>

Reply via email to