Hi Di

Thank you for your patience and explanation.

If this is a server-side configuration, we currently cannot modify it in
the client configuration. If Doris supports client-side configuration in
the future, we can reconsider whether to support it.

I currently have no other questions regarding this FLIP.  LGTM.


Best,
Feng

On Mon, Mar 25, 2024 at 3:42 PM wudi <676366...@qq.com.invalid> wrote:

> Hi, Feng
>
> Yes, if the StreamLoad transaction timeout is very short, you may
> encounter this situation.
>
> The timeout for StreamLoad transactions is controlled by the
> streaming_label_keep_max_second parameter [1] in FE (Frontend), and the
> default value is 12 hours. Currently, it is a global transaction
> configuration and cannot be set separately for a specific transaction.
>
> However, I understand the default 12-hour timeout should cover most cases
> unless you are restarting from a checkpoint that occurred a long time ago.
> What do you think?
>
>
> [1]
> https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168
>
>
> Brs
> di.wu
>
> > 2024年3月25日 11:45,Feng Jin <jinfeng1...@gmail.com> 写道:
> >
> > Hi Di
> >
> > Thanks for your reply.
> >
> > The timeout I'm referring to here is not the commit timeout, but rather
> the
> > timeout for a single streamLoad transaction.
> >
> > Let's say we have set the transaction timeout for StreamLoad to be 10
> > minutes. Now, imagine there is a Flink job with two subtasks. Due to
> > significant data skew and backpressure issues, subtask 0 and subtask 1
> are
> > processing at different speeds. Subtask 0 finishes processing this
> > checkpoint first, while subtask 1 takes another 10 minutes to complete
> its
> > processing. At this point, the job's checkpoint is done. However, since
> > subtask 0 has been waiting for subtask 1 all along, its corresponding
> > streamLoad transaction closes after more than 10 minutes have passed - by
> > which time the server has already cleaned up this transaction, leading
> to a
> > failed commit.
> > Therefore, I would like to know if in such situations we can avoid this
> > problem by setting a longer lifespan for transactions.
> >
> >
> > Best,
> > Feng
> >
> >
> > On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote:
> >
> >> Hi, Feng,
> >>
> >> 1. Are you suggesting that when a commit gets stuck, we can interrupt
> the
> >> commit request using a timeout parameter? Currently, there is no such
> >> parameter. In my understanding, in a two-phase commit, checkpoint must
> be
> >> enabled, so the commit timeout is essentially the checkpoint timeout.
> >> Therefore, it seems unnecessary to add an additional parameter here.
> What
> >> do you think?
> >>
> >> 2. In addition to deleting checkpoints to re-consume data again, the
> >> Connector also provides an option to ignore commit errors[1]. However,
> this
> >> option is only used for error recovery scenarios, such as when a
> >> transaction is cleared by the server but you want to reuse the upstream
> >> offset from the checkpoint.
> >>
> >> 3. Also, thank you for pointing out the issue with the parameter. It has
> >> already been addressed[2], but the FLIP changes were overlooked. It has
> >> been updated.
> >>
> >> [1]
> >>
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
> >> [2]
> >>
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98
> >>
> >> Brs
> >> di.wu
> >>
> >>
> >>
> >>> 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道:
> >>>
> >>> Hi Di,
> >>>
> >>> Thank you for the update, as well as quickly implementing corresponding
> >>> capabilities including filter push down and project push down.
> >>>
> >>> Regarding the transaction timeout, I still have some doubts. I would
> like
> >>> to confirm if we can control this timeout parameter in the connector,
> >> such
> >>> as setting it to 10 minutes or 1 hour.
> >>> Also, when a transaction is cleared by the server, the commit operation
> >> of
> >>> the connector will fail, leading to job failure. In this case, can
> users
> >>> only choose to delete the checkpoint and re-consume historical data?
> >>>
> >>> There is also a small question regarding the parameters*: *
> >>> *doris.request.connect.timeout.ms <
> >> http://doris.request.connect.timeout.ms>*
> >>> and d*oris.request.read.timeout.ms <
> http://oris.request.read.timeout.ms
> >>> *,
> >>> can we change them to Duration type and remove the "ms" suffix.?
> >>> This way, all time parameters can be kept uniform in type as duration.
> >>>
> >>>
> >>> Best,
> >>> Feng
> >>>
> >>> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote:
> >>>
> >>>> Hi, Feng,
> >>>> Thank you, that's a great suggestion !
> >>>>
> >>>> I have already implemented FilterPushDown and removed that parameter
> on
> >>>> DorisDynamicTableSource[1], and also updated FLIP.
> >>>>
> >>>> Regarding the mention of [Doris also aborts transactions], it may not
> >> have
> >>>> been described accurately. It mainly refers to the automatic
> expiration
> >> of
> >>>> long-running transactions in Doris that have not been committed for a
> >>>> prolonged period.
> >>>>
> >>>> As for two-phase commit, when a commit fails, the checkpoint will also
> >>>> fail, and the job will be continuously retried.
> >>>>
> >>>> [1]
> >>>>
> >>
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58
> >>>>
> >>>> Brs
> >>>> di.wu
> >>>>
> >>>>
> >>>>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道:
> >>>>>
> >>>>> Hi Di
> >>>>>
> >>>>> Thank you for initiating this FLIP, +1 for this.
> >>>>>
> >>>>> Regarding the option `doris.filter.query` of doris source table
> >>>>>
> >>>>> Can we directly implement the FilterPushDown capability of Flink
> Source
> >>>>> like Jdbc Source [1] instead of introducing an option?
> >>>>>
> >>>>>
> >>>>> Regarding two-phase commit,
> >>>>>
> >>>>>> At the same time, Doris will also abort transactions that have not
> >> been
> >>>>> committed for a long time
> >>>>>
> >>>>> Can we control the transaction timeout in the connector?
> >>>>> And control the behavior when timeout occurs, whether to discard by
> >>>> default
> >>>>> or trigger job failure?
> >>>>>
> >>>>>
> >>>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024
> >>>>>
> >>>>> Best,
> >>>>> Feng
> >>>>>
> >>>>>
> >>>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky
> >> <ferenc.cs...@pm.me.invalid
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> Thanks for driving this, +1 for the FLIP.
> >>>>>>
> >>>>>> Best,
> >>>>>> Ferenc
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy <
> >> hamdy10...@gmail.com
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Hello,
> >>>>>>> Thanks for the proposal, +1 for the FLIP.
> >>>>>>>
> >>>>>>> Best Regards
> >>>>>>> Ahmed Hamdy
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote:
> >>>>>>>
> >>>>>>>> Hi, Leonard
> >>>>>>>> Thank you for your suggestion.
> >>>>>>>> I referred to other Connectors[1], modified the naming and types
> of
> >>>>>>>> relevant parameters[2], and also updated FLIP.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
> >>>>>>>>
> >>>>>>>> Brs,
> >>>>>>>> di.wu
> >>>>>>>>
> >>>>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道:
> >>>>>>>>>
> >>>>>>>>> Thanks wudi for the updating, the FLIP generally looks good to
> me,
> >> I
> >>>>>>>>> only left two minor suggestions:
> >>>>>>>>>
> >>>>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s
> >>>>>> looks
> >>>>>>>>> strange to me, could we change all time interval related option
> >>>>>> value type
> >>>>>>>>> to Duration ?
> >>>>>>>>>
> >>>>>>>>> (2) Could you check and improve all config options like
> >>>>>>>>> `doris.exec.mem.limit` to make them to follow flink config option
> >>>>>> naming
> >>>>>>>>> and value type?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Leonard
> >>>>>>>>>
> >>>>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Di,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to
> >>>>>> know
> >>>>>>>>>>> your
> >>>>>>>>>>> thoughts about the following questions:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. According to your clarification of the exactly-once, thanks
> >>>>>> for it
> >>>>>>>>>>> BTW,
> >>>>>>>>>>> no PreCommitTopology is required. Does it make sense to let
> >>>>>>>>>>> DorisSink[1]
> >>>>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink
> is
> >>>>>>>>>>> deprecated[2] before turning the Doris connector into a Flink
> >>>>>>>>>>> connector?
> >>>>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a
> >>>>>> data
> >>>>>>>>>>> pipeline
> >>>>>>>>>>> to support further e.g. ad-hoc query or cube with feasible
> >>>>>>>>>>> pre-aggregation.
> >>>>>>>>>>> Just out of curiosity, would you like to share some real use
> >>>>>> cases that
> >>>>>>>>>>> will use OLAP engines as the source of a streaming data
> >>>>>> pipeline? Or it
> >>>>>>>>>>> will only be used as the source for the batch?
> >>>>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken.
> >>>>>> Would you
> >>>>>>>>>>> like
> >>>>>>>>>>> to test the source in E2E too?
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
> >>>>>>>>
> >>>>>>>>>>> [2]
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
> >>>>>>>>
> >>>>>>>>>>> [3]
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
> >>>>>>>>
> >>>>>>>>>>> Best regards,
> >>>>>>>>>>> Jing
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid
> >>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi, Jeyhun Karimov.
> >>>>>>>>>>>> Thanks for your question.
> >>>>>>>>>>>>
> >>>>>>>>>>>> - How to ensure Exactly-Once?
> >>>>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger
> >>>>>> the
> >>>>>>>>>>>> precommit api of StreamLoad to complete the persistence of
> >>>>>> data in
> >>>>>>>>>>>> Doris
> >>>>>>>>>>>> (the data will not be visible at this time), and will also
> >>>>>> pass this
> >>>>>>>>>>>> TxnID
> >>>>>>>>>>>> to the Committer.
> >>>>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the
> >>>>>> Committer
> >>>>>>>>>>>> will
> >>>>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete
> >>>>>> the
> >>>>>>>>>>>> visibility of the transaction.
> >>>>>>>>>>>> 3. When the task is restarted, the Txn with successful
> >>>>>> precommit and
> >>>>>>>>>>>> failed commit will be aborted based on the label-prefix, and
> >>>>>> Doris'
> >>>>>>>>>>>> abort
> >>>>>>>>>>>> API will be called. (At the same time, Doris will also abort
> >>>>>>>>>>>> transactions
> >>>>>>>>>>>> that have not been committed for a long time)
> >>>>>>>>>>>>
> >>>>>>>>>>>> ps: At the same time, this part of the content has been
> >>>>>> updated in
> >>>>>>>>>>>> FLIP
> >>>>>>>>>>>>
> >>>>>>>>>>>> - Because the default table model in Doris is Duplicate (
> >>>>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which
> >>>>>> does not
> >>>>>>>>>>>> have a primary key, batch writing may cause data duplication,
> >>>>>> but
> >>>>>>>>>>>> UNIQ The
> >>>>>>>>>>>> model has a primary key, which ensures the idempotence of
> >>>>>> writing,
> >>>>>>>>>>>> thus
> >>>>>>>>>>>> achieving Exactly-Once
> >>>>>>>>>>>>
> >>>>>>>>>>>> Brs,
> >>>>>>>>>>>> di.wu
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the proposal. +1 for the FLIP.
> >>>>>>>>>>>>> I have a few questions:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and
> >>>>>> Flink's
> >>>>>>>>>>>>> two-phase
> >>>>>>>>>>>>> commit) combination will ensure the e2e exactly-once
> >>>>>> semantics?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the
> >>>>>>>>>>>>> primary key
> >>>>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate
> >>>>>> more on
> >>>>>>>>>>>>> that?
> >>>>>>>>>>>>> Why it is not the default behavior but a workaround?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv
> >>>>>> decq12y...@gmail.com
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for driving this.
> >>>>>>>>>>>>>> The content is very detailed, it is recommended to add a
> >>>>>> section on
> >>>>>>>>>>>>>> Test
> >>>>>>>>>>>>>> Plan for more completeness.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Previously, we had some discussions about contributing
> >>>>>> Flink Doris
> >>>>>>>>>>>>>>> Connector to the Flink community [1]. I want to further
> >>>>>> promote
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>> work.
> >>>>>>>>>>>>>>> I hope everyone will help participate in this FLIP
> >>>>>> discussion and
> >>>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>> more valuable opinions and suggestions.
> >>>>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>
> >>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Brs,
> >>>>>>>>>>>>>>> di.wu
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> As discussed in the previous email [1], about
> >>>>>> contributing the
> >>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>> Doris Connector to the Flink community.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time
> >>>>>> analytical
> >>>>>>>>>>>>>>>> database
> >>>>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink
> >>>>>> is used for
> >>>>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris,
> >>>>>> Flink Doris
> >>>>>>>>>>>>>>>> Connector
> >>>>>>>>>>>>>>>> is an effective tool.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector
> >>>>>> to the Flink
> >>>>>>>>>>>>>>>> community will further expand the Flink Connectors
> >>>>>> ecosystem.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> So I would like to start an official discussion
> >>>>>> FLIP-399: Flink
> >>>>>>>>>>>>>>>> Connector Doris[3].
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions
> >>>>>> from the
> >>>>>>>>>>>>>>>> community on the proposal.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>
> >>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
> >>>>>>>>>>>>>>>> [2]
> >>>>>>>>
> >>>>>>>>
> >> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/
> >>>>>>>>
> >>>>>>>>>>>>>>>> [3]
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
> >>>>>>>>
> >>>>>>>>>>>>>>>> Brs,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> di.wu
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
>
>

Reply via email to