Hi Etienne, Nice blog! Thanks for sharing!
Best regards, Jing On Wed, Nov 9, 2022 at 5:49 PM Etienne Chauchot <echauc...@apache.org> wrote: > Hi Yun Gao, > > FYI I just updated the article after your review: > https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html > > Best > > Etienne > Le 09/11/2022 à 10:04, Etienne Chauchot a écrit : > > Hi Yun Gao, > > thanks for your email and your review ! > > My comments are inline > Le 08/11/2022 à 06:51, Yun Gao a écrit : > > Hi Etienne, > > Very thanks for the article! Flink is currently indeed keeping increasing > the > ability of unified batch / stream processing with the same api, and its a > great > pleasure that more and more users are trying this functionality. But I also > have some questions regarding some details. > > First IMO, as a whole for the long run Flink will have two unified APIs, > namely Table / SQL > API and DataStream API. Users could express the computation logic with > these two APIs > for both bounded and unbounded data processing. > > > Yes that is what I understood also throughout the discussions and jiras. > And I also think IMHO that reducing the number of APIs to 2 was the good > move. > > > Underlying Flink provides two > execution modes: the streaming mode works with both bounded and unbounded > data, > and it executes in a way of incremental processing based on state; the > batch mode works > only with bounded data, and it executes in a ways level-by-level similar > to the traditional > batch processing frameworks. Users could switch the execution mode via > EnvironmentSettings.inBatchMode() for > StreamExecutionEnvironment.setRuntimeMode(). > > As recommended in Flink docs(1) I have enabled the batch mode as I though > it would be more efficient on my bounded pipeline but as a matter of fact > the streaming mode seems to be more efficient on my use case. I'll test > with higher volumes to confirm. > > > > Specially for DataStream, as implemented in FLIP-140, currently all the > existing DataStream > operation supports the batch execution mode in a unified way[1]: data > will be sorted for the > keyBy() edges according to the key, then the following operations like > reduce() could receive > all the data belonging to the same key consecutively, then it could > directly reducing the records > of the same key without maintaining the intermediate states. In this way > users could write the > same code for both streaming and batch processing with the same code. > > > Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream pipeline > will work with no modification if I plug an unbounded source to it. > > > > # Regarding the migration of Join / Reduce > > First I think Reduce is always supported and users could write > dataStream.keyBy().reduce(xx) > directly, and if batch execution mode is set, the reduce will not be > executed in a incremental way, > instead is acts much like sort-based aggregation in the traditional > batch processing framework. > > Regarding Join, although the issue of FLINK-22587 indeed exists: current > join has to be bound > to a window and the GlobalWindow does not work properly, but with some > more try currently > it does not need users to re-write the whole join from scratch: Users > could write a dedicated > window assigner that assigns all the records to the same window instance > and return > EventTimeTrigger.create() as the default event-time trigger [2]. Then it > works > > source1.join(source2) > .where(a -> a.f0) > .equalTo(b -> b.f0) > .window(new EndOfStreamWindows()) > .apply(xxxx); > > It does not requires records have event-time attached since the trigger of > window is only > relying on the time range of the window and the assignment does not need > event-time either. > > The behavior of the join is also similar to sort-based join if batch mode > is enabled. > > Of course it is not easy to use to let users do the workaround and we'll > try to fix this issue in 1.17. > > > Yes, this is a better workaround than the manual state-based join that I > proposed. I tried it and it works perfectly with similar performance. > Thanks. > > > # Regarding support of Sort / Limit > > Currently these two operators are indeed not supported in the DataStream > API directly. One initial > though for these two operations are that users may convert the DataStream > to Table API and use > Table API for these two operators: > > DataStream<xx> xx = ... // Keeps the customized logic in DataStream > Table tableXX = tableEnv.fromDataStream(dataStream); > tableXX.orderBy($("a").asc()); > > > Yes I knew that workaround but I decided not to use it because I have a > special SQL based implementation (for comparison reasons) so I did not want > to mix SQL and DataStream APIs in the same pipeline. > > > How do you think about this option? We are also assessing if the > combination of DataStream > API / Table API is sufficient for all the batch users. Any suggestions are > warmly welcome. > > > I guess that outside of my use case of comparing the performance of the 3 > Flink APIs (broader subject than this article), users can easily mix the > APIs in the same pipeline. If we really want to have these operations in > the DataStream API maybe wrapping state-based implementations could be good > if their performance meets our expectations. > > > > Best, > Yun Gao > > I'll update the article and the code with your suggestions. Thanks again. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode > > > Best > > Etienne > > > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/ > [2] > https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java > > > > ------------------------------------------------------------------ > From:liu ron <ron9....@gmail.com> <ron9....@gmail.com> > Send Time:2022 Nov. 8 (Tue.) 10:21 > To:dev <dev@flink.apache.org> <dev@flink.apache.org>; Etienne Chauchot > <echauc...@apache.org> <echauc...@apache.org>; user > <u...@flink.apache.org> <u...@flink.apache.org> > Subject:Re: [blog article] Howto migrate a real-life batch pipeline from > the DataSet API to the DataStream API > > Thanks for your post, It looks very good to me, also maybe for developers, > > Best, > Liudalong > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道: > Wow, cool! Thanks for your work. > It'll be definitely helpful for the users that want to migrate their batch > job from DataSet API to DataStream API. > > Best regards, > Yuxia > > ----- 原始邮件 ----- > 发件人: "Etienne Chauchot" <echauc...@apache.org> > 收件人: "dev" <dev@flink.apache.org>, "User" <u...@flink.apache.org> > 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54 > 主题: [blog article] Howto migrate a real-life batch pipeline from the > DataSet API to the DataStream API > > Hi everyone, > > In case some of you are interested, I just posted a blog article about > migrating a real-life batch pipeline from the DataSet API to the > DataStream API: > > > https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html > > Best > > Etienne > > >