Hi, Feng

Yes, if the StreamLoad transaction timeout is very short, you may encounter 
this situation.

The timeout for StreamLoad transactions is controlled by the 
streaming_label_keep_max_second parameter [1] in FE (Frontend), and the default 
value is 12 hours. Currently, it is a global transaction configuration and 
cannot be set separately for a specific transaction.

However, I understand the default 12-hour timeout should cover most cases 
unless you are restarting from a checkpoint that occurred a long time ago. What 
do you think?


[1] 
https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168


Brs
di.wu

> 2024年3月25日 11:45,Feng Jin <jinfeng1...@gmail.com> 写道:
> 
> Hi Di
> 
> Thanks for your reply.
> 
> The timeout I'm referring to here is not the commit timeout, but rather the
> timeout for a single streamLoad transaction.
> 
> Let's say we have set the transaction timeout for StreamLoad to be 10
> minutes. Now, imagine there is a Flink job with two subtasks. Due to
> significant data skew and backpressure issues, subtask 0 and subtask 1 are
> processing at different speeds. Subtask 0 finishes processing this
> checkpoint first, while subtask 1 takes another 10 minutes to complete its
> processing. At this point, the job's checkpoint is done. However, since
> subtask 0 has been waiting for subtask 1 all along, its corresponding
> streamLoad transaction closes after more than 10 minutes have passed - by
> which time the server has already cleaned up this transaction, leading to a
> failed commit.
> Therefore, I would like to know if in such situations we can avoid this
> problem by setting a longer lifespan for transactions.
> 
> 
> Best,
> Feng
> 
> 
> On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> Hi, Feng,
>> 
>> 1. Are you suggesting that when a commit gets stuck, we can interrupt the
>> commit request using a timeout parameter? Currently, there is no such
>> parameter. In my understanding, in a two-phase commit, checkpoint must be
>> enabled, so the commit timeout is essentially the checkpoint timeout.
>> Therefore, it seems unnecessary to add an additional parameter here. What
>> do you think?
>> 
>> 2. In addition to deleting checkpoints to re-consume data again, the
>> Connector also provides an option to ignore commit errors[1]. However, this
>> option is only used for error recovery scenarios, such as when a
>> transaction is cleared by the server but you want to reuse the upstream
>> offset from the checkpoint.
>> 
>> 3. Also, thank you for pointing out the issue with the parameter. It has
>> already been addressed[2], but the FLIP changes were overlooked. It has
>> been updated.
>> 
>> [1]
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
>> [2]
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98
>> 
>> Brs
>> di.wu
>> 
>> 
>> 
>>> 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道:
>>> 
>>> Hi Di,
>>> 
>>> Thank you for the update, as well as quickly implementing corresponding
>>> capabilities including filter push down and project push down.
>>> 
>>> Regarding the transaction timeout, I still have some doubts. I would like
>>> to confirm if we can control this timeout parameter in the connector,
>> such
>>> as setting it to 10 minutes or 1 hour.
>>> Also, when a transaction is cleared by the server, the commit operation
>> of
>>> the connector will fail, leading to job failure. In this case, can users
>>> only choose to delete the checkpoint and re-consume historical data?
>>> 
>>> There is also a small question regarding the parameters*: *
>>> *doris.request.connect.timeout.ms <
>> http://doris.request.connect.timeout.ms>*
>>> and d*oris.request.read.timeout.ms <http://oris.request.read.timeout.ms
>>> *,
>>> can we change them to Duration type and remove the "ms" suffix.?
>>> This way, all time parameters can be kept uniform in type as duration.
>>> 
>>> 
>>> Best,
>>> Feng
>>> 
>>> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote:
>>> 
>>>> Hi, Feng,
>>>> Thank you, that's a great suggestion !
>>>> 
>>>> I have already implemented FilterPushDown and removed that parameter on
>>>> DorisDynamicTableSource[1], and also updated FLIP.
>>>> 
>>>> Regarding the mention of [Doris also aborts transactions], it may not
>> have
>>>> been described accurately. It mainly refers to the automatic expiration
>> of
>>>> long-running transactions in Doris that have not been committed for a
>>>> prolonged period.
>>>> 
>>>> As for two-phase commit, when a commit fails, the checkpoint will also
>>>> fail, and the job will be continuously retried.
>>>> 
>>>> [1]
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58
>>>> 
>>>> Brs
>>>> di.wu
>>>> 
>>>> 
>>>>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道:
>>>>> 
>>>>> Hi Di
>>>>> 
>>>>> Thank you for initiating this FLIP, +1 for this.
>>>>> 
>>>>> Regarding the option `doris.filter.query` of doris source table
>>>>> 
>>>>> Can we directly implement the FilterPushDown capability of Flink Source
>>>>> like Jdbc Source [1] instead of introducing an option?
>>>>> 
>>>>> 
>>>>> Regarding two-phase commit,
>>>>> 
>>>>>> At the same time, Doris will also abort transactions that have not
>> been
>>>>> committed for a long time
>>>>> 
>>>>> Can we control the transaction timeout in the connector?
>>>>> And control the behavior when timeout occurs, whether to discard by
>>>> default
>>>>> or trigger job failure?
>>>>> 
>>>>> 
>>>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024
>>>>> 
>>>>> Best,
>>>>> Feng
>>>>> 
>>>>> 
>>>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky
>> <ferenc.cs...@pm.me.invalid
>>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Thanks for driving this, +1 for the FLIP.
>>>>>> 
>>>>>> Best,
>>>>>> Ferenc
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy <
>> hamdy10...@gmail.com
>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Hello,
>>>>>>> Thanks for the proposal, +1 for the FLIP.
>>>>>>> 
>>>>>>> Best Regards
>>>>>>> Ahmed Hamdy
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote:
>>>>>>> 
>>>>>>>> Hi, Leonard
>>>>>>>> Thank you for your suggestion.
>>>>>>>> I referred to other Connectors[1], modified the naming and types of
>>>>>>>> relevant parameters[2], and also updated FLIP.
>>>>>>>> 
>>>>>>>> [1]
>>>>>>>> 
>>>>>> 
>>>> 
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
>>>>>>>> [1]
>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
>>>>>>>> 
>>>>>>>> Brs,
>>>>>>>> di.wu
>>>>>>>> 
>>>>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道:
>>>>>>>>> 
>>>>>>>>> Thanks wudi for the updating, the FLIP generally looks good to me,
>> I
>>>>>>>>> only left two minor suggestions:
>>>>>>>>> 
>>>>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s
>>>>>> looks
>>>>>>>>> strange to me, could we change all time interval related option
>>>>>> value type
>>>>>>>>> to Duration ?
>>>>>>>>> 
>>>>>>>>> (2) Could you check and improve all config options like
>>>>>>>>> `doris.exec.mem.limit` to make them to follow flink config option
>>>>>> naming
>>>>>>>>> and value type?
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Leonard
>>>>>>>>> 
>>>>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Di,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to
>>>>>> know
>>>>>>>>>>> your
>>>>>>>>>>> thoughts about the following questions:
>>>>>>>>>>> 
>>>>>>>>>>> 1. According to your clarification of the exactly-once, thanks
>>>>>> for it
>>>>>>>>>>> BTW,
>>>>>>>>>>> no PreCommitTopology is required. Does it make sense to let
>>>>>>>>>>> DorisSink[1]
>>>>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink is
>>>>>>>>>>> deprecated[2] before turning the Doris connector into a Flink
>>>>>>>>>>> connector?
>>>>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a
>>>>>> data
>>>>>>>>>>> pipeline
>>>>>>>>>>> to support further e.g. ad-hoc query or cube with feasible
>>>>>>>>>>> pre-aggregation.
>>>>>>>>>>> Just out of curiosity, would you like to share some real use
>>>>>> cases that
>>>>>>>>>>> will use OLAP engines as the source of a streaming data
>>>>>> pipeline? Or it
>>>>>>>>>>> will only be used as the source for the batch?
>>>>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken.
>>>>>> Would you
>>>>>>>>>>> like
>>>>>>>>>>> to test the source in E2E too?
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
>>>>>>>> 
>>>>>>>>>>> [2]
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
>>>>>>>> 
>>>>>>>>>>> [3]
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
>>>>>>>> 
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Jing
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid
>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi, Jeyhun Karimov.
>>>>>>>>>>>> Thanks for your question.
>>>>>>>>>>>> 
>>>>>>>>>>>> - How to ensure Exactly-Once?
>>>>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger
>>>>>> the
>>>>>>>>>>>> precommit api of StreamLoad to complete the persistence of
>>>>>> data in
>>>>>>>>>>>> Doris
>>>>>>>>>>>> (the data will not be visible at this time), and will also
>>>>>> pass this
>>>>>>>>>>>> TxnID
>>>>>>>>>>>> to the Committer.
>>>>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the
>>>>>> Committer
>>>>>>>>>>>> will
>>>>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete
>>>>>> the
>>>>>>>>>>>> visibility of the transaction.
>>>>>>>>>>>> 3. When the task is restarted, the Txn with successful
>>>>>> precommit and
>>>>>>>>>>>> failed commit will be aborted based on the label-prefix, and
>>>>>> Doris'
>>>>>>>>>>>> abort
>>>>>>>>>>>> API will be called. (At the same time, Doris will also abort
>>>>>>>>>>>> transactions
>>>>>>>>>>>> that have not been committed for a long time)
>>>>>>>>>>>> 
>>>>>>>>>>>> ps: At the same time, this part of the content has been
>>>>>> updated in
>>>>>>>>>>>> FLIP
>>>>>>>>>>>> 
>>>>>>>>>>>> - Because the default table model in Doris is Duplicate (
>>>>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which
>>>>>> does not
>>>>>>>>>>>> have a primary key, batch writing may cause data duplication,
>>>>>> but
>>>>>>>>>>>> UNIQ The
>>>>>>>>>>>> model has a primary key, which ensures the idempotence of
>>>>>> writing,
>>>>>>>>>>>> thus
>>>>>>>>>>>> achieving Exactly-Once
>>>>>>>>>>>> 
>>>>>>>>>>>> Brs,
>>>>>>>>>>>> di.wu
>>>>>>>>>>>> 
>>>>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the proposal. +1 for the FLIP.
>>>>>>>>>>>>> I have a few questions:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and
>>>>>> Flink's
>>>>>>>>>>>>> two-phase
>>>>>>>>>>>>> commit) combination will ensure the e2e exactly-once
>>>>>> semantics?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the
>>>>>>>>>>>>> primary key
>>>>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate
>>>>>> more on
>>>>>>>>>>>>> that?
>>>>>>>>>>>>> Why it is not the default behavior but a workaround?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv
>>>>>> decq12y...@gmail.com
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for driving this.
>>>>>>>>>>>>>> The content is very detailed, it is recommended to add a
>>>>>> section on
>>>>>>>>>>>>>> Test
>>>>>>>>>>>>>> Plan for more completeness.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Previously, we had some discussions about contributing
>>>>>> Flink Doris
>>>>>>>>>>>>>>> Connector to the Flink community [1]. I want to further
>>>>>> promote
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> work.
>>>>>>>>>>>>>>> I hope everyone will help participate in this FLIP
>>>>>> discussion and
>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>> more valuable opinions and suggestions.
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> 
>>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Brs,
>>>>>>>>>>>>>>> di.wu
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> As discussed in the previous email [1], about
>>>>>> contributing the
>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>> Doris Connector to the Flink community.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time
>>>>>> analytical
>>>>>>>>>>>>>>>> database
>>>>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink
>>>>>> is used for
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris,
>>>>>> Flink Doris
>>>>>>>>>>>>>>>> Connector
>>>>>>>>>>>>>>>> is an effective tool.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector
>>>>>> to the Flink
>>>>>>>>>>>>>>>> community will further expand the Flink Connectors
>>>>>> ecosystem.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> So I would like to start an official discussion
>>>>>> FLIP-399: Flink
>>>>>>>>>>>>>>>> Connector Doris[3].
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions
>>>>>> from the
>>>>>>>>>>>>>>>> community on the proposal.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>> 
>>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
>>>>>>>>>>>>>>>> [2]
>>>>>>>> 
>>>>>>>> 
>> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/
>>>>>>>> 
>>>>>>>>>>>>>>>> [3]
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
>>>>>>>> 
>>>>>>>>>>>>>>>> Brs,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> di.wu
>>>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 

Reply via email to