Hi Di Thank you for your patience and explanation.
If this is a server-side configuration, we currently cannot modify it in the client configuration. If Doris supports client-side configuration in the future, we can reconsider whether to support it. I currently have no other questions regarding this FLIP. LGTM. Best, Feng On Mon, Mar 25, 2024 at 3:42 PM wudi <676366...@qq.com.invalid> wrote: > Hi, Feng > > Yes, if the StreamLoad transaction timeout is very short, you may > encounter this situation. > > The timeout for StreamLoad transactions is controlled by the > streaming_label_keep_max_second parameter [1] in FE (Frontend), and the > default value is 12 hours. Currently, it is a global transaction > configuration and cannot be set separately for a specific transaction. > > However, I understand the default 12-hour timeout should cover most cases > unless you are restarting from a checkpoint that occurred a long time ago. > What do you think? > > > [1] > https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168 > > > Brs > di.wu > > > 2024年3月25日 11:45,Feng Jin <jinfeng1...@gmail.com> 写道: > > > > Hi Di > > > > Thanks for your reply. > > > > The timeout I'm referring to here is not the commit timeout, but rather > the > > timeout for a single streamLoad transaction. > > > > Let's say we have set the transaction timeout for StreamLoad to be 10 > > minutes. Now, imagine there is a Flink job with two subtasks. Due to > > significant data skew and backpressure issues, subtask 0 and subtask 1 > are > > processing at different speeds. Subtask 0 finishes processing this > > checkpoint first, while subtask 1 takes another 10 minutes to complete > its > > processing. At this point, the job's checkpoint is done. However, since > > subtask 0 has been waiting for subtask 1 all along, its corresponding > > streamLoad transaction closes after more than 10 minutes have passed - by > > which time the server has already cleaned up this transaction, leading > to a > > failed commit. > > Therefore, I would like to know if in such situations we can avoid this > > problem by setting a longer lifespan for transactions. > > > > > > Best, > > Feng > > > > > > On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote: > > > >> Hi, Feng, > >> > >> 1. Are you suggesting that when a commit gets stuck, we can interrupt > the > >> commit request using a timeout parameter? Currently, there is no such > >> parameter. In my understanding, in a two-phase commit, checkpoint must > be > >> enabled, so the commit timeout is essentially the checkpoint timeout. > >> Therefore, it seems unnecessary to add an additional parameter here. > What > >> do you think? > >> > >> 2. In addition to deleting checkpoints to re-consume data again, the > >> Connector also provides an option to ignore commit errors[1]. However, > this > >> option is only used for error recovery scenarios, such as when a > >> transaction is cleared by the server but you want to reuse the upstream > >> offset from the checkpoint. > >> > >> 3. Also, thank you for pointing out the issue with the parameter. It has > >> already been addressed[2], but the FLIP changes were overlooked. It has > >> been updated. > >> > >> [1] > >> > https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160 > >> [2] > >> > https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98 > >> > >> Brs > >> di.wu > >> > >> > >> > >>> 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道: > >>> > >>> Hi Di, > >>> > >>> Thank you for the update, as well as quickly implementing corresponding > >>> capabilities including filter push down and project push down. > >>> > >>> Regarding the transaction timeout, I still have some doubts. I would > like > >>> to confirm if we can control this timeout parameter in the connector, > >> such > >>> as setting it to 10 minutes or 1 hour. > >>> Also, when a transaction is cleared by the server, the commit operation > >> of > >>> the connector will fail, leading to job failure. In this case, can > users > >>> only choose to delete the checkpoint and re-consume historical data? > >>> > >>> There is also a small question regarding the parameters*: * > >>> *doris.request.connect.timeout.ms < > >> http://doris.request.connect.timeout.ms>* > >>> and d*oris.request.read.timeout.ms < > http://oris.request.read.timeout.ms > >>> *, > >>> can we change them to Duration type and remove the "ms" suffix.? > >>> This way, all time parameters can be kept uniform in type as duration. > >>> > >>> > >>> Best, > >>> Feng > >>> > >>> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote: > >>> > >>>> Hi, Feng, > >>>> Thank you, that's a great suggestion ! > >>>> > >>>> I have already implemented FilterPushDown and removed that parameter > on > >>>> DorisDynamicTableSource[1], and also updated FLIP. > >>>> > >>>> Regarding the mention of [Doris also aborts transactions], it may not > >> have > >>>> been described accurately. It mainly refers to the automatic > expiration > >> of > >>>> long-running transactions in Doris that have not been committed for a > >>>> prolonged period. > >>>> > >>>> As for two-phase commit, when a commit fails, the checkpoint will also > >>>> fail, and the job will be continuously retried. > >>>> > >>>> [1] > >>>> > >> > https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58 > >>>> > >>>> Brs > >>>> di.wu > >>>> > >>>> > >>>>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道: > >>>>> > >>>>> Hi Di > >>>>> > >>>>> Thank you for initiating this FLIP, +1 for this. > >>>>> > >>>>> Regarding the option `doris.filter.query` of doris source table > >>>>> > >>>>> Can we directly implement the FilterPushDown capability of Flink > Source > >>>>> like Jdbc Source [1] instead of introducing an option? > >>>>> > >>>>> > >>>>> Regarding two-phase commit, > >>>>> > >>>>>> At the same time, Doris will also abort transactions that have not > >> been > >>>>> committed for a long time > >>>>> > >>>>> Can we control the transaction timeout in the connector? > >>>>> And control the behavior when timeout occurs, whether to discard by > >>>> default > >>>>> or trigger job failure? > >>>>> > >>>>> > >>>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024 > >>>>> > >>>>> Best, > >>>>> Feng > >>>>> > >>>>> > >>>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky > >> <ferenc.cs...@pm.me.invalid > >>>>> > >>>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> Thanks for driving this, +1 for the FLIP. > >>>>>> > >>>>>> Best, > >>>>>> Ferenc > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy < > >> hamdy10...@gmail.com > >>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Hello, > >>>>>>> Thanks for the proposal, +1 for the FLIP. > >>>>>>> > >>>>>>> Best Regards > >>>>>>> Ahmed Hamdy > >>>>>>> > >>>>>>> > >>>>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote: > >>>>>>> > >>>>>>>> Hi, Leonard > >>>>>>>> Thank you for your suggestion. > >>>>>>>> I referred to other Connectors[1], modified the naming and types > of > >>>>>>>> relevant parameters[2], and also updated FLIP. > >>>>>>>> > >>>>>>>> [1] > >>>>>>>> > >>>>>> > >>>> > >> > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/ > >>>>>>>> [1] > >>>>>>>> > >>>>>> > >>>> > >> > https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java > >>>>>>>> > >>>>>>>> Brs, > >>>>>>>> di.wu > >>>>>>>> > >>>>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道: > >>>>>>>>> > >>>>>>>>> Thanks wudi for the updating, the FLIP generally looks good to > me, > >> I > >>>>>>>>> only left two minor suggestions: > >>>>>>>>> > >>>>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s > >>>>>> looks > >>>>>>>>> strange to me, could we change all time interval related option > >>>>>> value type > >>>>>>>>> to Duration ? > >>>>>>>>> > >>>>>>>>> (2) Could you check and improve all config options like > >>>>>>>>> `doris.exec.mem.limit` to make them to follow flink config option > >>>>>> naming > >>>>>>>>> and value type? > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Leonard > >>>>>>>>> > >>>>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道: > >>>>>>>>>>> > >>>>>>>>>>> Hi Di, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to > >>>>>> know > >>>>>>>>>>> your > >>>>>>>>>>> thoughts about the following questions: > >>>>>>>>>>> > >>>>>>>>>>> 1. According to your clarification of the exactly-once, thanks > >>>>>> for it > >>>>>>>>>>> BTW, > >>>>>>>>>>> no PreCommitTopology is required. Does it make sense to let > >>>>>>>>>>> DorisSink[1] > >>>>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink > is > >>>>>>>>>>> deprecated[2] before turning the Doris connector into a Flink > >>>>>>>>>>> connector? > >>>>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a > >>>>>> data > >>>>>>>>>>> pipeline > >>>>>>>>>>> to support further e.g. ad-hoc query or cube with feasible > >>>>>>>>>>> pre-aggregation. > >>>>>>>>>>> Just out of curiosity, would you like to share some real use > >>>>>> cases that > >>>>>>>>>>> will use OLAP engines as the source of a streaming data > >>>>>> pipeline? Or it > >>>>>>>>>>> will only be used as the source for the batch? > >>>>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken. > >>>>>> Would you > >>>>>>>>>>> like > >>>>>>>>>>> to test the source in E2E too? > >>>>>>>>>>> > >>>>>>>>>>> [1] > >>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55 > >>>>>>>> > >>>>>>>>>>> [2] > >>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API > >>>>>>>> > >>>>>>>>>>> [3] > >>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96 > >>>>>>>> > >>>>>>>>>>> Best regards, > >>>>>>>>>>> Jing > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid > >>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi, Jeyhun Karimov. > >>>>>>>>>>>> Thanks for your question. > >>>>>>>>>>>> > >>>>>>>>>>>> - How to ensure Exactly-Once? > >>>>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger > >>>>>> the > >>>>>>>>>>>> precommit api of StreamLoad to complete the persistence of > >>>>>> data in > >>>>>>>>>>>> Doris > >>>>>>>>>>>> (the data will not be visible at this time), and will also > >>>>>> pass this > >>>>>>>>>>>> TxnID > >>>>>>>>>>>> to the Committer. > >>>>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the > >>>>>> Committer > >>>>>>>>>>>> will > >>>>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete > >>>>>> the > >>>>>>>>>>>> visibility of the transaction. > >>>>>>>>>>>> 3. When the task is restarted, the Txn with successful > >>>>>> precommit and > >>>>>>>>>>>> failed commit will be aborted based on the label-prefix, and > >>>>>> Doris' > >>>>>>>>>>>> abort > >>>>>>>>>>>> API will be called. (At the same time, Doris will also abort > >>>>>>>>>>>> transactions > >>>>>>>>>>>> that have not been committed for a long time) > >>>>>>>>>>>> > >>>>>>>>>>>> ps: At the same time, this part of the content has been > >>>>>> updated in > >>>>>>>>>>>> FLIP > >>>>>>>>>>>> > >>>>>>>>>>>> - Because the default table model in Doris is Duplicate ( > >>>>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which > >>>>>> does not > >>>>>>>>>>>> have a primary key, batch writing may cause data duplication, > >>>>>> but > >>>>>>>>>>>> UNIQ The > >>>>>>>>>>>> model has a primary key, which ensures the idempotence of > >>>>>> writing, > >>>>>>>>>>>> thus > >>>>>>>>>>>> achieving Exactly-Once > >>>>>>>>>>>> > >>>>>>>>>>>> Brs, > >>>>>>>>>>>> di.wu > >>>>>>>>>>>> > >>>>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the proposal. +1 for the FLIP. > >>>>>>>>>>>>> I have a few questions: > >>>>>>>>>>>>> > >>>>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and > >>>>>> Flink's > >>>>>>>>>>>>> two-phase > >>>>>>>>>>>>> commit) combination will ensure the e2e exactly-once > >>>>>> semantics? > >>>>>>>>>>>>> > >>>>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the > >>>>>>>>>>>>> primary key > >>>>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate > >>>>>> more on > >>>>>>>>>>>>> that? > >>>>>>>>>>>>> Why it is not the default behavior but a workaround? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv > >>>>>> decq12y...@gmail.com > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for driving this. > >>>>>>>>>>>>>> The content is very detailed, it is recommended to add a > >>>>>> section on > >>>>>>>>>>>>>> Test > >>>>>>>>>>>>>> Plan for more completeness. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Previously, we had some discussions about contributing > >>>>>> Flink Doris > >>>>>>>>>>>>>>> Connector to the Flink community [1]. I want to further > >>>>>> promote > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>> work. > >>>>>>>>>>>>>>> I hope everyone will help participate in this FLIP > >>>>>> discussion and > >>>>>>>>>>>>>>> provide > >>>>>>>>>>>>>>> more valuable opinions and suggestions. > >>>>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>> > >>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Brs, > >>>>>>>>>>>>>>> di.wu > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> As discussed in the previous email [1], about > >>>>>> contributing the > >>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>> Doris Connector to the Flink community. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time > >>>>>> analytical > >>>>>>>>>>>>>>>> database > >>>>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink > >>>>>> is used for > >>>>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris, > >>>>>> Flink Doris > >>>>>>>>>>>>>>>> Connector > >>>>>>>>>>>>>>>> is an effective tool. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector > >>>>>> to the Flink > >>>>>>>>>>>>>>>> community will further expand the Flink Connectors > >>>>>> ecosystem. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> So I would like to start an official discussion > >>>>>> FLIP-399: Flink > >>>>>>>>>>>>>>>> Connector Doris[3]. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions > >>>>>> from the > >>>>>>>>>>>>>>>> community on the proposal. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>> > >>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p > >>>>>>>>>>>>>>>> [2] > >>>>>>>> > >>>>>>>> > >> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/ > >>>>>>>> > >>>>>>>>>>>>>>>> [3] > >>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris > >>>>>>>> > >>>>>>>>>>>>>>>> Brs, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> di.wu > >>>>>> > >>>> > >>>> > >>> > >> > >> > >