Hi Godfrey,

Regarding Table API for CTAS, "Table#createTableAs(tablePath)" seems a
little strange to me.
Usually, the parameter after AS should be the query, but the query is in
front of AS.
I slightly prefer a method on TableEnvironment besides "createTable" (i.e.
a special createTable with writing data).

For example:
void createTableAs(String path, TableDescriptor descriptor, Table query);

Usage:
tableEnv.createTableAs(
                "T1",
                TableDescriptor.forConnector("hive")
                        .option("format", "parquet")
                        .build(),
                query);


Best,
Jark

On Wed, 18 May 2022 at 22:53, Jark Wu <imj...@gmail.com> wrote:

> Hi Mang,
>
> Thanks for proposing this, CTAS is a very important API for batch users.
>
> I think the key problem of this FLIP is the ACID semantics of the CTAS
> operation.
> We care most about two parts of the semantics:
> 1) Atomicity: the created table should be rolled back if the write is
> failed.
> 2) Isolation: the created table shouldn't be visible before the write is
> successful (read uncommitted).
>
> From your investigation, it seems that:
> - Flink (your FLIP): none of them.   ==> LEVEL-1
> - Spark DataSource v1: is atomic (can roll back), but is not isolated. ==>
> LEVEL-2
> - Spark DataSource v2: guarantees both of them.  ==> LEVEL-3
> - Hive MR: guarantees both of them. ==> LEVEL-3
>
> In order to support higher ACID semantics, I agree with Godfrey that we
> need some hooks in JM
> which can be called when the job is finished or failed/canceled. It might
> look like
> `StreamExecutionEnvironment#registerJobListener(JobListener)`,
> but JobListener is called on the
> client side. What we need is an interface called on the JM side, because
> the job can be submitted in
> detached mode.
>
> With this interface, we can easily support LEVEL-2 semantics by calling
> `Catalog#dropTable` in the
> `JobListener#onJobFailed`. We can also support LEVEL-3 by introducing
> `StagingTableCatalog` like Spark,
> calling `StagedTable#commitStagedChanges()` in `JobListener#onJobFinished`
> and
> calling StagedTable#abortStagedChanges() in `JobListener#onJobFailed`.
>
> Best,
> Jark
>
>
> On Wed, 18 May 2022 at 12:29, godfrey he <godfre...@gmail.com> wrote:
>
>> Hi Mang,
>>
>> Thanks for driving this FLIP.
>>
>> Please follow the FLIP template[1] style, and the `Syntax ` is part of
>> the `Public API Changes` section.
>> ‘Program research’ and 'Implementation Plan' are part of the `Proposed
>> Changes` section,
>> or move ‘Program research’ to the appendix.
>>
>> > Providing methods that are used to execute CTAS for Table API users.
>> We should introduce `createTable` in `Table` instead of
>> `TableEnvironment`.
>> Because all table operations are defined in `Table`, see:
>> Table#executeInsert,
>> Table#insertInto, etc.
>> About the method name, I prefer to use `createTableAs`.
>>
>> > TableSink needs to provide the CleanUp API, developers implement as
>> needed.
>> I think it's hard for TableSink to implement a clean up operation. For
>> file system sink,
>> the data can be written to a temporary directory, but for key/value
>> sinks, it's hard to
>> remove the written keys, unless the sink records all written keys.
>>
>> > Do not do drop table operations in the framework, drop table is
>> implemented in
>> TableSink according to the needs of specific TableSink
>> The TM process may crash at any time, and the drop operation will not
>> be executed any more.
>>
>> How about we do the drop table operation and cleanup data action in the
>> catalog?
>> Where to execute the drop operation. one approach is in client, other is
>> in JM.
>> 1. in client: this requires the client to be alive until the job is
>> finished and failed.
>> 2. in JM: this requires the JM could provide some interfaces/hooks
>> that the planner
>> implements the logic and the code will be executed in JM.
>> I prefer the approach two, but it requires more detail design with
>> runtime @gaoyunhaii, @kevin.yingjie
>>
>>
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>
>> Best,
>> Godfrey
>>
>>
>> Mang Zhang <zhangma...@163.com> 于2022年5月6日周五 11:24写道:
>>
>> >
>> > Hi, Yuxia
>> > Thanks for your reply!
>> > About the question 1, we will not support, FLIP-218[1] is to simplify
>> the complexity of user DDL and make it easier for users to use. I have
>> never encountered this case in a big data.
>> > About the question 2, we will provide a public API like below public
>> void cleanUp();
>> >
>> >       Regarding the mechanism of cleanUp, people who are familiar with
>> the runtime module need to provide professional advice, which is what we
>> need to focus on.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> >
>> > Best regards,
>> > Mang Zhang
>> >
>> >
>> >
>> >
>> >
>> > At 2022-04-29 17:00:03, "yuxia" <luoyu...@alumni.sjtu.edu.cn> wrote:
>> > >Thanks for for driving this work, it's to be a useful feature.
>> > >About the flip-218, I have some questions.
>> > >
>> > >1: Does our CTAS syntax support specify target table's schema
>> including column name and data type? I think it maybe a useful fature in
>> case we want to change the data types in target table instead of always
>> copy the source table's schema. It'll be more flexible with this feature.
>> > >Btw, MySQL's "CREATE TABLE ... SELECT Statement"[1] support this
>> feature.
>> > >
>> > >2: Seems it'll requre sink to implement an public interface to drop
>> table, so what's the interface will look like?
>> > >
>> > >[1] https://dev.mysql.com/doc/refman/8.0/en/create-table-select.html
>> > >
>> > >Best regards,
>> > >Yuxia
>> > >
>> > >----- 原始邮件 -----
>> > >发件人: "Mang Zhang" <zhangma...@163.com>
>> > >收件人: "dev" <dev@flink.apache.org>
>> > >发送时间: 星期四, 2022年 4 月 28日 下午 4:57:24
>> > >主题: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS)
>> > >
>> > >Hi, everyone
>> > >
>> > >
>> > >I would like to open a discussion for support select clause in CREATE
>> TABLE(CTAS),
>> > >With the development of business and the enhancement of flink sql
>> capabilities, queries become more and more complex.
>> > >Now the user needs to use the Create Table statement to create the
>> target table first, and then execute the insert statement.
>> > >However, the target table may have many columns, which will bring a
>> lot of work outside the business logic to the user.
>> > >At the same time, ensure that the schema of the created target table
>> is consistent with the schema of the query result.
>> > >Using a CTAS syntax like Hive/Spark can greatly facilitate the user.
>> > >
>> > >
>> > >
>> > >You can find more details in FLIP-218[1]. Looking forward to your
>> feedback.
>> > >
>> > >
>> > >
>> > >[1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-218%3A+Support+SELECT+clause+in+CREATE+TABLE(CTAS)
>> > >
>> > >
>> > >
>> > >
>> > >--
>> > >
>> > >Best regards,
>> > >Mang Zhang
>> >
>> >
>>
>

Reply via email to