Hi, Mang Thanks for your update, the FLIP looks good to me now.
Best, Ron Mang Zhang <[email protected]> 于2023年6月9日周五 12:08写道: > Hi Ron, > Thanks for your reply! > After our offline discussion, at present, there may be many of flink jobs > using non-atomic CTAS functions, especially Stream jobs, > If we only infer whether atomic CTAS is supported based on whether > DynamicTableSink implements the SupportsStaging interface, > then after the user upgrades to a new version, flink's behavior will > change, which is not production friendly. > in order to ensure the consistency of flink behavior, and to give the user > maximum flexibility, > in time DynamicTableSink implements the SupportsStaging interface, users > can still choose non-atomic implementation according to business needs. > > I have updated FLIP-305[1]. > > Looking forward to more feedback, if there is no other feedback, I will > launch a vote next Monday(2023-06-12). > Thanks again! > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement > > > -- > > Best regards, > > Mang Zhang > > > > At 2023-06-09 10:23:21, "liu ron" <[email protected]> wrote: > >Hi, Mang > > > >In FLIP-214, we have discussed that atomicity is not needed in streaming > >mode, so we have implemented the initial version that doesn't support > >atomicity. In addition, we introduce the option > >"table.ctas.atomicity-enabled" to enable the atomic ability. According to > >your FLIP-315 description, Once the DynamicTableSink implements the > >SupportsStaging interface, the atomicity is the default behavior whether in > >stream mode or batch mode, and the user can't change it, I think this is > >not feasible for streaming mode, the atomicity should can be controlled by > >user. So I think we should clear the atomicity behavior combine the option > >and SuppportsStage interface in FLIP. Only the DynamicTableSink implements > >the SupportsStaging and option is enabled, only atomicity is enabled. WDYT? > > > >Best, > >Ron > > > >Jark Wu <[email protected]> 于2023年6月8日周四 16:30写道: > > > >> Thank you for the great work, Mang! The updated proposal looks good to me. > >> > >> Best, > >> Jark > >> > >> > 2023年6月8日 11:49,Jingsong Li <[email protected]> 写道: > >> > > >> > Thanks Mang for updating! > >> > > >> > Looks good to me! > >> > > >> > Best, > >> > Jingsong > >> > > >> > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <[email protected]> wrote: > >> >> > >> >> Hi Jingsong, > >> >> > >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our > >> >>> Flink design places execution in the TableFactory or directly in the > >> >>> Catalog, so introducing an executable table makes me feel a bit > >> >>> strange. (Spark is this style, but Flink may not be) > >> >> On this issue, we introduce the executable logic commit/abort a bit of > >> strange on CatalogTable. > >> >> After an offline discussion with yuxia, I tweaked the FLIP-305[1] > >> scenario. > >> >> The new solution is similar to the implementation of SupportsOverwrite, > >> >> which introduces the SupportsStaging interface and infers whether > >> DynamicTableSink supports atomic ctas based on whether it implements the > >> SupportsStaging interface, > >> >> and if so, it will get the StagedTable object from DynamicTableSink. > >> >> > >> >> For more implementation details, please see the FLIP-305 document. > >> >> > >> >> This is my poc commits > >> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa > >> >> > >> >> > >> >> [1] > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement > >> >> > >> >> > >> >> -- > >> >> > >> >> Best regards, > >> >> > >> >> Mang Zhang > >> >> > >> >> > >> >> > >> >> At 2023-05-12 13:02:14, "Jingsong Li" <[email protected]> wrote: > >> >>> Hi Mang, > >> >>> > >> >>> Thanks for starting this FLIP. > >> >>> > >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our > >> >>> Flink design places execution in the TableFactory or directly in the > >> >>> Catalog, so introducing an executable table makes me feel a bit > >> >>> strange. (Spark is this style, but Flink may not be) > >> >>> > >> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better? > >> >>> > >> >>> Best, > >> >>> Jingsong > >> >>> > >> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang <[email protected]> wrote: > >> >>>> > >> >>>> Hi Ron, > >> >>>> > >> >>>> > >> >>>> First of all, thank you for your reply! > >> >>>> After our offline communication, what you said is mainly in the > >> compilePlan scenario, but currently compilePlanSql does not support non > >> INSERT statements, otherwise it will throw an exception. > >> >>>>> Unsupported SQL query! compilePlanSql() only accepts a single SQL > >> statement of type INSERT > >> >>>> But it's a good point that I will seriously consider. > >> >>>> Non-atomic CTAS can be supported relatively easily; > >> >>>> But atomic CTAS needs more adaptation work, so I'm going to leave it > >> as is and follow up with a separate issue to implement CTAS support for > >> compilePlanSql. > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> -- > >> >>>> > >> >>>> Best regards, > >> >>>> Mang Zhang > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> At 2023-04-23 17:52:07, "liu ron" <[email protected]> wrote: > >> >>>>> Hi, Mang > >> >>>>> > >> >>>>> I have a question about the implementation details. For the > >> atomicity case, > >> >>>>> since the target table is not created before the JobGraph is > >> generated, but > >> >>>>> then the target table is required to exist when optimizing plan to > >> generate > >> >>>>> the JobGraph. So how do you solve this problem? > >> >>>>> > >> >>>>> Best, > >> >>>>> Ron > >> >>>>> > >> >>>>> yuxia <[email protected]> 于2023年4月20日周四 09:35写道: > >> >>>>> > >> >>>>>> Share some insights about the new TwoPhaseCatalogTable proposed > >> after > >> >>>>>> offline discussion with Mang. > >> >>>>>> The main or important reason is that the TwoPhaseCatalogTable > >> enables > >> >>>>>> external connectors to implement theirs own logic for commit / > >> abort. > >> >>>>>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the > >> table > >> >>>>>> when the job fail. It's not ideal for it's too generic to work well. > >> >>>>>> For example, some connectors will need to clean some temporary > >> files in > >> >>>>>> abort method. And the actual connector can know the specific logic > >> for > >> >>>>>> aborting. > >> >>>>>> > >> >>>>>> Best regards, > >> >>>>>> Yuxia > >> >>>>>> > >> >>>>>> > >> >>>>>> 发件人: "zhangmang1" <[email protected]> > >> >>>>>> 收件人: "dev" <[email protected]>, "Jing Ge" <[email protected]> > >> >>>>>> 抄送: "ron9 liu" <[email protected]>, "lincoln 86xy" < > >> >>>>>> [email protected]>, [email protected] > >> >>>>>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36 > >> >>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS > >> >>>>>> SELECT(CTAS) statement > >> >>>>>> > >> >>>>>> hi, Jing > >> >>>>>> Thank you for your reply. > >> >>>>>>> 1. It looks like you found another way to design the atomic CTAS > >> with new > >> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog > >> serializable > >> >>>>>> as > >> >>>>>>> described in FLIP-218. Did I understand correctly? > >> >>>>>> Yes, when I was implementing the FLIP-218 solution, I encountered > >> problems > >> >>>>>> with Catalog/CatalogTable serialization deserialization, for > >> example, after > >> >>>>>> deserialization CatalogTable could not be converted to Hive Table. > >> Also, > >> >>>>>> Catalog serialization is still a heavy operation, but it may not > >> actually > >> >>>>>> be necessary, we just need Create Table. > >> >>>>>> Therefore, the TwoPhaseCatalogTable program is proposed, which also > >> >>>>>> facilitates the implementation of the subsequent data lake, > >> ReplaceTable > >> >>>>>> and other functions. > >> >>>>>> > >> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter > >> of > >> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector > >> argument(code > >> >>>>>>> smell) we should commonly avoid in the public interface. According > >> to the > >> >>>>>>> FLIP, isStreamingMode will be used by the Catalog to determine > >> whether to > >> >>>>>>> support atomic or not. With this selector argument, there will be > >> two > >> >>>>>>> different logics built within one method and it is hard to follow > >> without > >> >>>>>>> reading the code or the doc carefully(another concern is to keep > >> the doc > >> >>>>>>> and code alway be consistent) i.e. sometimes there will be no > >> difference > >> >>>>>> by > >> >>>>>>> using true/false isStreamingMode, sometimes they are quite > >> different - > >> >>>>>>> atomic vs. non-atomic. Another question is, before we call > >> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of > >> >>>>>>> isStreamingMode. In case only non-atomic is supported for > >> streaming mode, > >> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling > >> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did > >> I miss > >> >>>>>>> anything here? > >> >>>>>> Here's what I think about this issue, atomic CTAS wants to be the > >> default > >> >>>>>> behavior and only fall back to non-atomic CTAS if it's completely > >> >>>>>> unattainable. Atomic CTAS will bring a better experience to users. > >> >>>>>> Flink is already a stream batch unified engine, In our company > >> kwai, many > >> >>>>>> users are also using flink to do batch data processing, but still > >> running > >> >>>>>> in Stream mode. > >> >>>>>> The boundary between stream and batch is gradually blurred, stream > >> mode > >> >>>>>> jobs may also FINISH, so I added the isStreamingMode parameter, this > >> >>>>>> provides different atomicity implementations in Batch and Stream > >> modes. > >> >>>>>> Not only to determine if atomicity is supported, but also to help > >> select > >> >>>>>> different TwoPhaseCatalogTable implementations to provide different > >> levels > >> >>>>>> of atomicity! > >> >>>>>> > >> >>>>>> Looking forward to more feedback. > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> -- > >> >>>>>> Best regards, > >> >>>>>> Mang Zhang > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> At 2023-04-15 04:20:40, "Jing Ge" <[email protected]> > >> wrote: > >> >>>>>>> Hi Mang, > >> >>>>>>> > >> >>>>>>> This is the FLIP I was looking forward to after FLIP-218. Thanks > >> for > >> >>>>>>> driving it. I have two questions and would like to know your > >> thoughts, > >> >>>>>>> thanks: > >> >>>>>>> > >> >>>>>>> 1. It looks like you found another way to design the atomic CTAS > >> with new > >> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog > >> serializable > >> >>>>>> as > >> >>>>>>> described in FLIP-218. Did I understand correctly? > >> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter > >> of > >> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector > >> argument(code > >> >>>>>>> smell) we should commonly avoid in the public interface. According > >> to the > >> >>>>>>> FLIP, isStreamingMode will be used by the Catalog to determine > >> whether to > >> >>>>>>> support atomic or not. With this selector argument, there will be > >> two > >> >>>>>>> different logics built within one method and it is hard to follow > >> without > >> >>>>>>> reading the code or the doc carefully(another concern is to keep > >> the doc > >> >>>>>>> and code alway be consistent) i.e. sometimes there will be no > >> difference > >> >>>>>> by > >> >>>>>>> using true/false isStreamingMode, sometimes they are quite > >> different - > >> >>>>>>> atomic vs. non-atomic. Another question is, before we call > >> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of > >> >>>>>>> isStreamingMode. In case only non-atomic is supported for > >> streaming mode, > >> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling > >> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did > >> I miss > >> >>>>>>> anything here? > >> >>>>>>> > >> >>>>>>> Best regards, > >> >>>>>>> Jing > >> >>>>>>> > >> >>>>>>> On Fri, Apr 14, 2023 at 1:55 PM yuxia <[email protected] > >> > > >> >>>>>> wrote: > >> >>>>>>> > >> >>>>>>>> Hi, Mang. > >> >>>>>>>> +1 for completing the support for atomicity of CTAS, this is very > >> useful > >> >>>>>>>> in batch scenarios and integrate with the data lake which support > >> >>>>>>>> transcation. > >> >>>>>>>> > >> >>>>>>>> I just have one question, IIUC, the DynamiacTableSink will need > >> to know > >> >>>>>>>> it's for normal case or the atomicity with CTAS as well as > >> neccessary > >> >>>>>>>> context. > >> >>>>>>>> Take jdbc catalog as an example, if it's CTAS with atomicity > >> supports, > >> >>>>>> the > >> >>>>>>>> jdbc DynamiacTableSink will write the temp table defined in the > >> >>>>>>>> TwoPhaseCatalogTable which is different from normal case. > >> >>>>>>>> > >> >>>>>>>> How can the DynamiacTableSink can get it? Could you give some > >> >>>>>> explanation > >> >>>>>>>> or example in this FLIP? > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> Best regards, > >> >>>>>>>> Yuxia > >> >>>>>>>> > >> >>>>>>>> ----- 原始邮件 ----- > >> >>>>>>>> 发件人: "zhangmang1" <[email protected]> > >> >>>>>>>> 收件人: "dev" <[email protected]>, "ron9 liu" <[email protected] > >> >, > >> >>>>>>>> "lincoln 86xy" <[email protected]> > >> >>>>>>>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40 > >> >>>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS > >> >>>>>>>> SELECT(CTAS) statement > >> >>>>>>>> > >> >>>>>>>> Hi, Lincoln and Ron > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> Thank you for your reply. > >> >>>>>>>> On the naming wise I think OK, the future expansion of new > >> features more > >> >>>>>>>> uniform. I have updated the FLIP. > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> About Hive support atomicity CTAS, Hive is rich in usage > >> scenarios and > >> >>>>>> can > >> >>>>>>>> be divided into three scenarios: 1. writing Hive tables 2. > >> writing Hive > >> >>>>>>>> tables with speculative execution 3. writing Hive table with > >> small file > >> >>>>>>>> merge > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> The main purpose of FLIP-305 is to implement support for CTAS > >> atomicity > >> >>>>>> in > >> >>>>>>>> the Flink framework, > >> >>>>>>>> so I only poc to verify the first scenario of writing to the Hive > >> table, > >> >>>>>>>> and we can subsequently split the sub-task to support the other > >> two > >> >>>>>>>> scenarios. > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> -- > >> >>>>>>>> > >> >>>>>>>> Best regards, > >> >>>>>>>> Mang Zhang > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> At 2023-04-13 12:27:24, "Lincoln Lee" <[email protected]> > >> wrote: > >> >>>>>>>>> Hi, Mang > >> >>>>>>>>> > >> >>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very > >> >>>>>> useful > >> >>>>>>>> in > >> >>>>>>>>> batch scenarios. > >> >>>>>>>>> > >> >>>>>>>>> I have two questions: > >> >>>>>>>>> 1. naming wise: > >> >>>>>>>>> a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to > >> >>>>>>>>> `Catalog#twoPhaseCreateTable` (and we may add > >> >>>>>>>>> twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later) > >> >>>>>>>>> b) for the `TwoPhaseCommitCatalogTable`, may it be better using > >> >>>>>>>>> `TwoPhaseCatalogTable`? > >> >>>>>>>>> c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word > >> >>>>>> 'transaction' > >> >>>>>>>>> in the method name, which may remind users of the relevance of > >> >>>>>> transaction > >> >>>>>>>>> support (however, it is not strictly so), so I suggest changing > >> it to > >> >>>>>>>>> `begin` > >> >>>>>>>>> 2. Has this design been validated by any relevant Poc on hive or > >> other > >> >>>>>>>>> catalogs? > >> >>>>>>>>> > >> >>>>>>>>> Best, > >> >>>>>>>>> Lincoln Lee > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> liu ron <[email protected]> 于2023年4月13日周四 10:17写道: > >> >>>>>>>>> > >> >>>>>>>>>> Hi, Mang > >> >>>>>>>>>> Atomicity is very important for CTAS, especially for batch > >> jobs. This > >> >>>>>>>> FLIP > >> >>>>>>>>>> is a continuation of FLIP-218, which is valuable for CTAS. > >> >>>>>>>>>> I just have one question, in the Motivation part of FLIP-218, we > >> >>>>>>>> mentioned > >> >>>>>>>>>> three levels of atomicity semantics, can this current design do > >> the > >> >>>>>>>> same as > >> >>>>>>>>>> Spark's DataSource V2, which can guarantee both atomicity and > >> >>>>>> isolation, > >> >>>>>>>>>> for example, can it be done by writing to Hive tables using > >> CTAS? > >> >>>>>>>>>> > >> >>>>>>>>>> Best, > >> >>>>>>>>>> Ron > >> >>>>>>>>>> > >> >>>>>>>>>> Mang Zhang <[email protected]> 于2023年4月10日周一 11:03写道: > >> >>>>>>>>>> > >> >>>>>>>>>>> Hi, everyone > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> I'd like to start a discussion about FLIP-305: Support atomic > >> for > >> >>>>>>>> CREATE > >> >>>>>>>>>>> TABLE AS SELECT(CTAS) statement [1]. > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> CREATE TABLE AS SELECT(CTAS) statement has been support, but > >> it's > >> >>>>>> not > >> >>>>>>>>>>> atomic. It will create the table first before job running. If > >> the > >> >>>>>> job > >> >>>>>>>>>>> execution fails, or is cancelled, the table will not be > >> dropped. > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> So I want Flink to support atomic CTAS, where only the table is > >> >>>>>>>> created > >> >>>>>>>>>>> when the Job succeeds. Improve user experience. > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> Looking forward to your feedback. > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> [1] > >> >>>>>>>>>>> > >> >>>>>>>>>> > >> >>>>>>>> > >> >>>>>> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> -- > >> >>>>>>>>>>> > >> >>>>>>>>>>> Best regards, > >> >>>>>>>>>>> Mang Zhang > >> >>>>>>>>>> > >> >>>>>>>> > >> >>>>>> > >> >>>>>> > >> > >> > >
