Hi Lincoln,

Thanks for proposing this retry feature for the async operator, this would
be very helpful for FLIP-234.
It's glad to see the vivid discussion, and the following are my thoughts:

1) +1 w/o retry state.
It's very tricky and hard to implement a semantic exact state for retry
(currentAttemps and firstExecTime/costTime
 may not be enough). I think this might be overdesigned because most users
are fine with more retries when
 failover happens. Flink also doesn't provide the exact retry semantic in
other places, e.g. "restart-strategy".

2) It confuses me what's the meaning of generic type <T>
of AsyncRetryStrategy and AsyncRetryPredicate.
It would be better to add an annotation description for it. In addition,
maybe <OUT> would be better to keep
aligned with other async interfaces (e.g. AsyncFunction).

3) timeout parameter: total timeout vs. timeout per async operation
According to the Javadoc `AsyncDataStream#orderedWait/unorderedWait`, the
"timeout" parameter is for
the asynchronous operation to complete, i.e. every call of
`AsyncFunction#asyncInvoke`. When we add a new
`orderedWaitWithRetry` method, I think we should keep the meaning of
"timeout" unchanged, otherwise,
we need a different parameter name and description.

Best,
Jark

On Wed, 25 May 2022 at 15:00, Lincoln Lee <lincoln.8...@gmail.com> wrote:

> Hi everyone,
>
> Gen Luo, Yun Gao and I had a long offline discussion about the
> implementation of the recovery part. The key point was should we store the
> retry state and do the recovery after the job restart?
>
> We reached a consensus not to store the retry state for now, which is the
> clearest for users and does not require any new changes to the current
> recovery behavior.  We have discussed three possible options, the behavior
> of these three options is identical in normal processing, the only
> difference lies in what retry state is recorded when do checkpointing, and
> what is the strategy when recovering.
>
> More details are updated into the FLIP[1], and the PoC[2] is also updated.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> [2] https://github.com/lincoln-lil/flink/tree/async-retry-poc
>
> Best,
> Lincoln Lee
>
>
> Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月24日周二 12:23写道:
>
> > Hi Gen Luo,
> >
> > You're right, the total cost time include the failover-restart time. So
> > when the failover time exceeds the retry timeout set by the user, in
> fact,
> > all the data to be retry after recovery will have no additional retry
> > opportunities, which is equivalent to normal data. In such circumstances,
> > the retry state takes no effect. But not all jobs' restart is slow and in
> > flink it is becoming more and more fast due the continuously
> improvements.
> > Hope this can help explaining your question.
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Gen Luo <luogen...@gmail.com> 于2022年5月24日周二 11:50写道:
> >
> >> Hi Lincoln,
> >>
> >> Thanks for the explanation. I understand your thought, but I'm a little
> >> confused by the additional detail.
> >> Is the startTime when the record is processed for the first time? And
> the
> >> cost time is counted based on it even after a job recovers from a
> failover
> >> or is restarted? For the failover case, the records may be processed
> >> successfully when normally running, but after some time (probably longer
> >> than the timeout) the job fails and restores, the records in the retry
> >> state will be timeout and discarded immediately. There's also same
> >> situation for the restarting case. I suppose in many cases the timeout
> >> will
> >> be less then the time a job may cost to restart, so in these cases the
> >> stored in-flight retry attempts will timeout immediately after the
> >> restarting, making the retry state meaningless. Please let me know if I
> >> mistake somthing.
> >>
> >> Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月24日周二 10:20写道:
> >>
> >> > Thanks Gen Luo!
> >> >
> >> > Agree with you that prefer the simpler design.
> >> >
> >> > I’d like to share my thoughts on this choice: whether store the retry
> >> state
> >> > or not only affect the recovery logic, not the per-record processing,
> >> so I
> >> > just compare the two:
> >> > 1. w/ retry state:  simple recovery but lost precision
> >> > 2. w/o retry state: one more state and little complexly but precise
> for
> >> > users
> >> > I prefer the second one for the user perspective, the additional
> >> complexity
> >> > is manageable.
> >> >
> >> > One detail that not mentioned in the FLIP: we will check if any time
> >> left
> >> >  (now() - startTime > timeout) for next attempt, so the real total
> >> attempts
> >> > will always less than or equal to maxAttempts and the total cost time
> <=
> >> > timeout (one special case is job failover takes too long)
> >> >
> >> > For the api, I've updated the FLIP[1]
> >> >
> >> > [1]:
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> >> >
> >> > Best,
> >> > Lincoln Lee
> >> >
> >> >
> >> > Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 16:54写道:
> >> >
> >> > > Hi Lincoln,
> >> > >
> >> > > Thanks for the quick reply.
> >> > >
> >> > >
> >> > >
> >> > > 1. I understand when restarting a job with a savepoint, the retry
> >> state
> >> > can
> >> > > ensure the total retry attempts and delay is expected. However, when
> >> > > failover happens while a job is running, the remaining attempts
> >> recorded
> >> > in
> >> > > the state are actually redid, and of course the total attempts are
> >> more
> >> > > than expected. The delay is indeed one of the concerns, but I'm
> >> wondering
> >> > > whether the retry state kept here is really important to users or
> >> not. In
> >> > > my opinion its benefit is limited but it makes the change much more
> >> > > complex. I would prefer a simpler solution, in which the retry state
> >> is
> >> > > still possible to add if the need really arises in the future, but I
> >> > > respect your decision.
> >> > >
> >> > >
> >> > >
> >> > > 2. I think adding a currentAttempts parameter to the method is good
> >> > enough.
> >> > >
> >> > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月23日周一 14:52写道:
> >> > >
> >> > > > Hi Gen Luo,
> >> > > >     Thanks a lot for your feedback!
> >> > > >
> >> > > > 1. About the retry state:
> >> > > > I considered dropping the retry state which really simplifies
> state
> >> > > changes
> >> > > > and avoids compatibility handling. The only reason I changed my
> mind
> >> > was
> >> > > > that it might be lossy to the user. Elements that has been tried
> >> > several
> >> > > > times but not exhausted its retry opportunities will reset the
> retry
> >> > > state
> >> > > > after a job failover-restart and start the retry process again (if
> >> the
> >> > > > retry condition persists true), which may cause a greater delay
> for
> >> the
> >> > > > retried elements, actually retrying more times and for longer than
> >> > > > expected. (Although in the PoC may also have a special case when
> >> > > > recovering: if the remaining timeout is exhausted for the
> >> > recalculation,
> >> > > it
> >> > > > will execute immediately but will have to register a timeout timer
> >> for
> >> > > the
> >> > > > async, here using an extra backoffTimeMillis)
> >> > > > For example, '60s fixed-delay retry if empty result, max-attempts:
> >> 5,
> >> > > > timeout 300s'
> >> > > > When checkpointing, some data has been retry 2 times, then suppose
> >> the
> >> > > job
> >> > > > is restarted and it takes 2min when the restart succeeds, if we
> drop
> >> > the
> >> > > > retry state, the worst case will take more 240s(60s * 2 + 2min)
> >> delay
> >> > for
> >> > > > users to finish retry.
> >> > > >
> >> > > > For my understanding(please correct me if I missed something), if
> a
> >> job
> >> > > is
> >> > > > resumed from a previous state and the retry strategy is changed,
> the
> >> > > > elements that need to be recovered in the retry state just needs
> the
> >> > new
> >> > > > strategy to take over the current attempts and time that has been
> >> used,
> >> > > or
> >> > > > give up retry if no retry strategy was set.
> >> > > > > and can be more compatible when the user restart a job with a
> >> changed
> >> > > > retry strategy.
> >> > > >
> >> > > > 2.  About the interface, do you think it would be helpful if add
> the
> >> > > > currentAttempts into getBackoffTimeMillis()? e.g.,  long
> >> > > > getBackoffTimeMillis(int currentAttempts)
> >> > > > The existing RetryStrategy and RestartBackoffTimeStrategy were in
> my
> >> > > > candidate list but not exactly match, and I want to avoid creating
> >> the
> >> > > new
> >> > > > instances for every attempt in RetryStrategy.
> >> > > >
> >> > > > WDYT?
> >> > > >
> >> > > > Best,
> >> > > > Lincoln Lee
> >> > > >
> >> > > >
> >> > > > Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 11:37写道:
> >> > > >
> >> > > > > Thank Lincoln for the proposal!
> >> > > > >
> >> > > > > The FLIP looks good to me. I'm in favor of the timer based
> >> > > > implementation,
> >> > > > > and I'd like to share some thoughts.
> >> > > > >
> >> > > > > I'm thinking if we have to store the retry status in the state.
> I
> >> > > suppose
> >> > > > > the retrying requests can just submit as the first attempt when
> >> the
> >> > job
> >> > > > > restores from a checkpoint, since in fact the side effect of the
> >> > > retries
> >> > > > > can not draw back by the restoring. This makes the state simpler
> >> and
> >> > > > makes
> >> > > > > it unnecessary to do the state migration, and can be more
> >> compatible
> >> > > when
> >> > > > > the user restart a job with a changed retry strategy.
> >> > > > >
> >> > > > > Besides, I find it hard to implement a flexible backoff strategy
> >> with
> >> > > the
> >> > > > > current AsyncRetryStrategy interface, for example an
> >> > > > > ExponentialBackoffRetryStrategy. Maybe we can add a parameter of
> >> the
> >> > > > > attempt or just use the
> >> > org.apache.flink.util.concurrent.RetryStrategy
> >> > > to
> >> > > > > take the place of the retry strategy part in the
> >> AsyncRetryStrategy?
> >> > > > >
> >> > > > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月20日周五 14:24写道:
> >> > > > >
> >> > > > > > Hi everyone,
> >> > > > > >
> >> > > > > >    By comparing the two internal implementations of delayed
> >> > retries,
> >> > > we
> >> > > > > > prefer the timer-based solution, which obtains precise delay
> >> > control
> >> > > > > > through simple logic and only needs to pay (what we consider
> to
> >> be
> >> > > > > > acceptable) timer instance cost for the retry element.  The
> >> FLIP[1]
> >> > > doc
> >> > > > > has
> >> > > > > > been updated.
> >> > > > > >
> >> > > > > > [1]:
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> >> > > > > >
> >> > > > > > Best,
> >> > > > > > Lincoln Lee
> >> > > > > >
> >> > > > > >
> >> > > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月16日周一 15:09写道:
> >> > > > > >
> >> > > > > > > Hi Jinsong,
> >> > > > > > >
> >> > > > > > > Good question!
> >> > > > > > >
> >> > > > > > > The delayQueue is very similar to incompleteElements in
> >> > > > > > > UnorderedStreamElementQueue, it only records the references
> of
> >> > > > > in-flight
> >> > > > > > > retry elements, the core value is for the ease of a fast
> scan
> >> > when
> >> > > > > force
> >> > > > > > > flush during endInput and less refactor for existing logic.
> >> > > > > > >
> >> > > > > > > Users needn't configure a new capacity for the delayQueue,
> >> just
> >> > > turn
> >> > > > > the
> >> > > > > > > original one up (if needed).
> >> > > > > > > And separately store the input data and retry state is
> mainly
> >> to
> >> > > > > > implement
> >> > > > > > > backwards compatibility. The first version of Poc, I used a
> >> > single
> >> > > > > > combined
> >> > > > > > > state in order to reduce state costs, but hard to keep
> >> > > compatibility,
> >> > > > > and
> >> > > > > > > changed  into two via Yun Gao's concern about the
> >> compatibility.
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Lincoln Lee
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 14:48写道:
> >> > > > > > >
> >> > > > > > >> Thanks  Lincoln for your reply.
> >> > > > > > >>
> >> > > > > > >> I'm a little confused about the relationship between
> >> > > > Ordered/Unordered
> >> > > > > > >> Queue and DelayQueue. Why do we need to have a DelayQueue?
> >> > > > > > >> Can we remove the DelayQueue and put the state of the retry
> >> in
> >> > the
> >> > > > > > >> StreamRecordQueueEntry (seems like it's already in the
> FLIP)
> >> > > > > > >> The advantages of doing this are:
> >> > > > > > >> 1. twice less data is stored in state
> >> > > > > > >> 2. the concept is unified, the user only needs to configure
> >> one
> >> > > > queue
> >> > > > > > >> capacity
> >> > > > > > >>
> >> > > > > > >> Best,
> >> > > > > > >> Jingsong
> >> > > > > > >>
> >> > > > > > >> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <
> >> > > > lincoln.8...@gmail.com>
> >> > > > > > >> wrote:
> >> > > > > > >>
> >> > > > > > >> > Hi Jinsong,
> >> > > > > > >> > Thanks for your feedback! Let me try to answer the two
> >> > > questions:
> >> > > > > > >> >
> >> > > > > > >> > For q1: Motivation
> >> > > > > > >> > Yes, users can implement retries themselves based on the
> >> > > external
> >> > > > > > async
> >> > > > > > >> > client, but this requires each user to do similar things,
> >> and
> >> > if
> >> > > > we
> >> > > > > > can
> >> > > > > > >> > support retries uniformly, user code would become much
> >> > simpler.
> >> > > > > > >> >
> >> > > > > > >> > > The real external call should happen in the
> asynchronous
> >> > > thread.
> >> > > > > > >> > My question is: If the user makes a retry in this
> >> asynchronous
> >> > > > > thread
> >> > > > > > by
> >> > > > > > >> > themselves, is there a difference between this and the
> >> current
> >> > > > > FLIP's?
> >> > > > > > >> >
> >> > > > > > >> >
> >> > > > > > >> > For q2: Block Main Thread
> >> > > > > > >> > You're right, the queue data will be stored in the
> >> ListState
> >> > > which
> >> > > > > is
> >> > > > > > an
> >> > > > > > >> > OperateState, though in fact, for ListState storage, the
> >> > > > theoretical
> >> > > > > > >> upper
> >> > > > > > >> > limit is Integer.MAX_VALUE, but we can't increase the
> queue
> >> > > > capacity
> >> > > > > > too
> >> > > > > > >> > big in production because the risk of OOM increases when
> >> the
> >> > > queue
> >> > > > > > >> capacity
> >> > > > > > >> > grows, and increases the task parallelism maybe a more
> >> viable
> >> > > way
> >> > > > > when
> >> > > > > > >> > encounter too many retry items for a single task.
> >> > > > > > >> > We recommend using a proper estimate of queue capacity
> >> based
> >> > on
> >> > > > the
> >> > > > > > >> formula
> >> > > > > > >> > like this: 'inputRate * retryRate * avgRetryDuration',
> and
> >> > also
> >> > > > the
> >> > > > > > >> actual
> >> > > > > > >> > checkpoint duration in runtime.
> >> > > > > > >> >
> >> > > > > > >> > > If I understand correctly, the retry queue will be put
> >> into
> >> > > > > > ListState,
> >> > > > > > >> > this
> >> > > > > > >> > state is OperatorState? As far as I know, OperatorState
> >> does
> >> > not
> >> > > > > have
> >> > > > > > >> the
> >> > > > > > >> > ability to store a lot of data.
> >> > > > > > >> > So after we need to retry more data, we should need to
> >> block
> >> > the
> >> > > > > main
> >> > > > > > >> > thread? What is the maximum size of the default retry
> >> queue?
> >> > > > > > >> >
> >> > > > > > >> >
> >> > > > > > >> >
> >> > > > > > >> > Best,
> >> > > > > > >> > Lincoln Lee
> >> > > > > > >> >
> >> > > > > > >> >
> >> > > > > > >> > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一
> >> 10:31写道:
> >> > > > > > >> >
> >> > > > > > >> > > Thank Lincoln for the proposal.
> >> > > > > > >> > >
> >> > > > > > >> > > ## Motivation:
> >> > > > > > >> > >
> >> > > > > > >> > > > asyncInvoke and callback functions are executed
> >> > > synchronously
> >> > > > by
> >> > > > > > the
> >> > > > > > >> > main
> >> > > > > > >> > > thread, which is not suitable adding long time blocking
> >> > > > > operations,
> >> > > > > > >> and
> >> > > > > > >> > > introducing additional thread will bring extra
> complexity
> >> > for
> >> > > > > users
> >> > > > > > >> > >
> >> > > > > > >> > > According to the documentation of AsyncFunction:
> >> > > > > > >> > >
> >> > > > > > >> > > > For each #asyncInvoke, an async io operation can be
> >> > > triggered,
> >> > > > > and
> >> > > > > > >> once
> >> > > > > > >> > > it has been done, the result can be collected by
> calling
> >> > > {@link
> >> > > > > > >> > > ResultFuture#complete}. For each async operation, its
> >> > context
> >> > > is
> >> > > > > > >> stored
> >> > > > > > >> > in
> >> > > > > > >> > > the operator immediately after invoking #asyncInvoke,
> >> > avoiding
> >> > > > > > >> blocking
> >> > > > > > >> > for
> >> > > > > > >> > > each stream input as long as the internal buffer is not
> >> > full.
> >> > > > > > >> > >
> >> > > > > > >> > > The real external call should happen in the
> asynchronous
> >> > > thread.
> >> > > > > > >> > >
> >> > > > > > >> > > My question is: If the user makes a retry in this
> >> > asynchronous
> >> > > > > > thread
> >> > > > > > >> by
> >> > > > > > >> > > themselves, is there a difference between this and the
> >> > current
> >> > > > > > FLIP's?
> >> > > > > > >> > >
> >> > > > > > >> > > ## Block Main Thread
> >> > > > > > >> > >
> >> > > > > > >> > > If I understand correctly, the retry queue will be put
> >> into
> >> > > > > > ListState,
> >> > > > > > >> > this
> >> > > > > > >> > > state is OperatorState? As far as I know, OperatorState
> >> does
> >> > > not
> >> > > > > > have
> >> > > > > > >> the
> >> > > > > > >> > > ability to store a lot of data.
> >> > > > > > >> > > So after we need to retry more data, we should need to
> >> block
> >> > > the
> >> > > > > > main
> >> > > > > > >> > > thread? What is the maximum size of the default retry
> >> queue?
> >> > > > > > >> > >
> >> > > > > > >> > > Best,
> >> > > > > > >> > > Jingsong
> >> > > > > > >> > >
> >> > > > > > >> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <
> >> > > > > lincoln.8...@gmail.com
> >> > > > > > >
> >> > > > > > >> > > wrote:
> >> > > > > > >> > >
> >> > > > > > >> > > > Dear Flink developers,
> >> > > > > > >> > > >
> >> > > > > > >> > > > I would like to open a discussion on FLIP 232 [1],
> >> for an
> >> > > > > > >> extension of
> >> > > > > > >> > > > AsyncWaitOperator to support retry for user's
> >> > asyncFunction.
> >> > > > > > >> > > >
> >> > > > > > >> > > > To do so, new user interface will added to define the
> >> > > trigger
> >> > > > > > >> condition
> >> > > > > > >> > > for
> >> > > > > > >> > > > retry and when should retry. Internally, a delayed
> >> retry
> >> > > > > mechanism
> >> > > > > > >> will
> >> > > > > > >> > > be
> >> > > > > > >> > > > introduced.
> >> > > > > > >> > > >
> >> > > > > > >> > > > There's PoC for this FLIP [2][3], thanks Yun Gao for
> >> > offline
> >> > > > > > >> > discussions
> >> > > > > > >> > > > and valuable comments.
> >> > > > > > >> > > > The new feature is backwards compatible that can
> >> recover
> >> > > from
> >> > > > > > state
> >> > > > > > >> > which
> >> > > > > > >> > > > was generated by prior flink versions, and if no
> retry
> >> > > > strategy
> >> > > > > > >> enabled
> >> > > > > > >> > > the
> >> > > > > > >> > > > behavior is as before.
> >> > > > > > >> > > >
> >> > > > > > >> > > > [1]
> >> > > > > > >> > > >
> >> > > > > > >> > >
> >> > > > > > >> >
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> >> > > > > > >> > > > [2] based on timer trigger
> >> > > > > > >> > > >
> >> > > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer
> >> > > > > > >> > > > [3] based on DelayQueue with pull fashion
> >> > > > > > >> > > >
> >> > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry
> >> > > > > > >> > > >
> >> > > > > > >> > > >
> >> > > > > > >> > > > Best,
> >> > > > > > >> > > > Lincoln Lee
> >> > > > > > >> > > >
> >> > > > > > >> > >
> >> > > > > > >> >
> >> > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to