Hi, Feng Yes, if the StreamLoad transaction timeout is very short, you may encounter this situation.
The timeout for StreamLoad transactions is controlled by the streaming_label_keep_max_second parameter [1] in FE (Frontend), and the default value is 12 hours. Currently, it is a global transaction configuration and cannot be set separately for a specific transaction. However, I understand the default 12-hour timeout should cover most cases unless you are restarting from a checkpoint that occurred a long time ago. What do you think? [1] https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168 Brs di.wu > 2024年3月25日 11:45,Feng Jin <jinfeng1...@gmail.com> 写道: > > Hi Di > > Thanks for your reply. > > The timeout I'm referring to here is not the commit timeout, but rather the > timeout for a single streamLoad transaction. > > Let's say we have set the transaction timeout for StreamLoad to be 10 > minutes. Now, imagine there is a Flink job with two subtasks. Due to > significant data skew and backpressure issues, subtask 0 and subtask 1 are > processing at different speeds. Subtask 0 finishes processing this > checkpoint first, while subtask 1 takes another 10 minutes to complete its > processing. At this point, the job's checkpoint is done. However, since > subtask 0 has been waiting for subtask 1 all along, its corresponding > streamLoad transaction closes after more than 10 minutes have passed - by > which time the server has already cleaned up this transaction, leading to a > failed commit. > Therefore, I would like to know if in such situations we can avoid this > problem by setting a longer lifespan for transactions. > > > Best, > Feng > > > On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote: > >> Hi, Feng, >> >> 1. Are you suggesting that when a commit gets stuck, we can interrupt the >> commit request using a timeout parameter? Currently, there is no such >> parameter. In my understanding, in a two-phase commit, checkpoint must be >> enabled, so the commit timeout is essentially the checkpoint timeout. >> Therefore, it seems unnecessary to add an additional parameter here. What >> do you think? >> >> 2. In addition to deleting checkpoints to re-consume data again, the >> Connector also provides an option to ignore commit errors[1]. However, this >> option is only used for error recovery scenarios, such as when a >> transaction is cleared by the server but you want to reuse the upstream >> offset from the checkpoint. >> >> 3. Also, thank you for pointing out the issue with the parameter. It has >> already been addressed[2], but the FLIP changes were overlooked. It has >> been updated. >> >> [1] >> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160 >> [2] >> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98 >> >> Brs >> di.wu >> >> >> >>> 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道: >>> >>> Hi Di, >>> >>> Thank you for the update, as well as quickly implementing corresponding >>> capabilities including filter push down and project push down. >>> >>> Regarding the transaction timeout, I still have some doubts. I would like >>> to confirm if we can control this timeout parameter in the connector, >> such >>> as setting it to 10 minutes or 1 hour. >>> Also, when a transaction is cleared by the server, the commit operation >> of >>> the connector will fail, leading to job failure. In this case, can users >>> only choose to delete the checkpoint and re-consume historical data? >>> >>> There is also a small question regarding the parameters*: * >>> *doris.request.connect.timeout.ms < >> http://doris.request.connect.timeout.ms>* >>> and d*oris.request.read.timeout.ms <http://oris.request.read.timeout.ms >>> *, >>> can we change them to Duration type and remove the "ms" suffix.? >>> This way, all time parameters can be kept uniform in type as duration. >>> >>> >>> Best, >>> Feng >>> >>> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote: >>> >>>> Hi, Feng, >>>> Thank you, that's a great suggestion ! >>>> >>>> I have already implemented FilterPushDown and removed that parameter on >>>> DorisDynamicTableSource[1], and also updated FLIP. >>>> >>>> Regarding the mention of [Doris also aborts transactions], it may not >> have >>>> been described accurately. It mainly refers to the automatic expiration >> of >>>> long-running transactions in Doris that have not been committed for a >>>> prolonged period. >>>> >>>> As for two-phase commit, when a commit fails, the checkpoint will also >>>> fail, and the job will be continuously retried. >>>> >>>> [1] >>>> >> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58 >>>> >>>> Brs >>>> di.wu >>>> >>>> >>>>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道: >>>>> >>>>> Hi Di >>>>> >>>>> Thank you for initiating this FLIP, +1 for this. >>>>> >>>>> Regarding the option `doris.filter.query` of doris source table >>>>> >>>>> Can we directly implement the FilterPushDown capability of Flink Source >>>>> like Jdbc Source [1] instead of introducing an option? >>>>> >>>>> >>>>> Regarding two-phase commit, >>>>> >>>>>> At the same time, Doris will also abort transactions that have not >> been >>>>> committed for a long time >>>>> >>>>> Can we control the transaction timeout in the connector? >>>>> And control the behavior when timeout occurs, whether to discard by >>>> default >>>>> or trigger job failure? >>>>> >>>>> >>>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024 >>>>> >>>>> Best, >>>>> Feng >>>>> >>>>> >>>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky >> <ferenc.cs...@pm.me.invalid >>>>> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for driving this, +1 for the FLIP. >>>>>> >>>>>> Best, >>>>>> Ferenc >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy < >> hamdy10...@gmail.com >>>>> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> Hello, >>>>>>> Thanks for the proposal, +1 for the FLIP. >>>>>>> >>>>>>> Best Regards >>>>>>> Ahmed Hamdy >>>>>>> >>>>>>> >>>>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote: >>>>>>> >>>>>>>> Hi, Leonard >>>>>>>> Thank you for your suggestion. >>>>>>>> I referred to other Connectors[1], modified the naming and types of >>>>>>>> relevant parameters[2], and also updated FLIP. >>>>>>>> >>>>>>>> [1] >>>>>>>> >>>>>> >>>> >> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/ >>>>>>>> [1] >>>>>>>> >>>>>> >>>> >> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java >>>>>>>> >>>>>>>> Brs, >>>>>>>> di.wu >>>>>>>> >>>>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道: >>>>>>>>> >>>>>>>>> Thanks wudi for the updating, the FLIP generally looks good to me, >> I >>>>>>>>> only left two minor suggestions: >>>>>>>>> >>>>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s >>>>>> looks >>>>>>>>> strange to me, could we change all time interval related option >>>>>> value type >>>>>>>>> to Duration ? >>>>>>>>> >>>>>>>>> (2) Could you check and improve all config options like >>>>>>>>> `doris.exec.mem.limit` to make them to follow flink config option >>>>>> naming >>>>>>>>> and value type? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Leonard >>>>>>>>> >>>>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道: >>>>>>>>>>> >>>>>>>>>>> Hi Di, >>>>>>>>>>> >>>>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to >>>>>> know >>>>>>>>>>> your >>>>>>>>>>> thoughts about the following questions: >>>>>>>>>>> >>>>>>>>>>> 1. According to your clarification of the exactly-once, thanks >>>>>> for it >>>>>>>>>>> BTW, >>>>>>>>>>> no PreCommitTopology is required. Does it make sense to let >>>>>>>>>>> DorisSink[1] >>>>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink is >>>>>>>>>>> deprecated[2] before turning the Doris connector into a Flink >>>>>>>>>>> connector? >>>>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a >>>>>> data >>>>>>>>>>> pipeline >>>>>>>>>>> to support further e.g. ad-hoc query or cube with feasible >>>>>>>>>>> pre-aggregation. >>>>>>>>>>> Just out of curiosity, would you like to share some real use >>>>>> cases that >>>>>>>>>>> will use OLAP engines as the source of a streaming data >>>>>> pipeline? Or it >>>>>>>>>>> will only be used as the source for the batch? >>>>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken. >>>>>> Would you >>>>>>>>>>> like >>>>>>>>>>> to test the source in E2E too? >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>> >>>>>>>> >>>>>> >>>> >> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55 >>>>>>>> >>>>>>>>>>> [2] >>>>>>>> >>>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API >>>>>>>> >>>>>>>>>>> [3] >>>>>>>> >>>>>>>> >>>>>> >>>> >> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96 >>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Jing >>>>>>>>>>> >>>>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid >>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, Jeyhun Karimov. >>>>>>>>>>>> Thanks for your question. >>>>>>>>>>>> >>>>>>>>>>>> - How to ensure Exactly-Once? >>>>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger >>>>>> the >>>>>>>>>>>> precommit api of StreamLoad to complete the persistence of >>>>>> data in >>>>>>>>>>>> Doris >>>>>>>>>>>> (the data will not be visible at this time), and will also >>>>>> pass this >>>>>>>>>>>> TxnID >>>>>>>>>>>> to the Committer. >>>>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the >>>>>> Committer >>>>>>>>>>>> will >>>>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete >>>>>> the >>>>>>>>>>>> visibility of the transaction. >>>>>>>>>>>> 3. When the task is restarted, the Txn with successful >>>>>> precommit and >>>>>>>>>>>> failed commit will be aborted based on the label-prefix, and >>>>>> Doris' >>>>>>>>>>>> abort >>>>>>>>>>>> API will be called. (At the same time, Doris will also abort >>>>>>>>>>>> transactions >>>>>>>>>>>> that have not been committed for a long time) >>>>>>>>>>>> >>>>>>>>>>>> ps: At the same time, this part of the content has been >>>>>> updated in >>>>>>>>>>>> FLIP >>>>>>>>>>>> >>>>>>>>>>>> - Because the default table model in Doris is Duplicate ( >>>>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which >>>>>> does not >>>>>>>>>>>> have a primary key, batch writing may cause data duplication, >>>>>> but >>>>>>>>>>>> UNIQ The >>>>>>>>>>>> model has a primary key, which ensures the idempotence of >>>>>> writing, >>>>>>>>>>>> thus >>>>>>>>>>>> achieving Exactly-Once >>>>>>>>>>>> >>>>>>>>>>>> Brs, >>>>>>>>>>>> di.wu >>>>>>>>>>>> >>>>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the proposal. +1 for the FLIP. >>>>>>>>>>>>> I have a few questions: >>>>>>>>>>>>> >>>>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and >>>>>> Flink's >>>>>>>>>>>>> two-phase >>>>>>>>>>>>> commit) combination will ensure the e2e exactly-once >>>>>> semantics? >>>>>>>>>>>>> >>>>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the >>>>>>>>>>>>> primary key >>>>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate >>>>>> more on >>>>>>>>>>>>> that? >>>>>>>>>>>>> Why it is not the default behavior but a workaround? >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv >>>>>> decq12y...@gmail.com >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for driving this. >>>>>>>>>>>>>> The content is very detailed, it is recommended to add a >>>>>> section on >>>>>>>>>>>>>> Test >>>>>>>>>>>>>> Plan for more completeness. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Previously, we had some discussions about contributing >>>>>> Flink Doris >>>>>>>>>>>>>>> Connector to the Flink community [1]. I want to further >>>>>> promote >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>> work. >>>>>>>>>>>>>>> I hope everyone will help participate in this FLIP >>>>>> discussion and >>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>> more valuable opinions and suggestions. >>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> >>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Brs, >>>>>>>>>>>>>>> di.wu >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As discussed in the previous email [1], about >>>>>> contributing the >>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>> Doris Connector to the Flink community. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time >>>>>> analytical >>>>>>>>>>>>>>>> database >>>>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink >>>>>> is used for >>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris, >>>>>> Flink Doris >>>>>>>>>>>>>>>> Connector >>>>>>>>>>>>>>>> is an effective tool. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector >>>>>> to the Flink >>>>>>>>>>>>>>>> community will further expand the Flink Connectors >>>>>> ecosystem. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So I would like to start an official discussion >>>>>> FLIP-399: Flink >>>>>>>>>>>>>>>> Connector Doris[3]. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions >>>>>> from the >>>>>>>>>>>>>>>> community on the proposal. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>> >>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p >>>>>>>>>>>>>>>> [2] >>>>>>>> >>>>>>>> >> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/ >>>>>>>> >>>>>>>>>>>>>>>> [3] >>>>>>>> >>>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris >>>>>>>> >>>>>>>>>>>>>>>> Brs, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> di.wu >>>>>> >>>> >>>> >>> >> >>