Hi,

@Yuan
Do you mean that there should be no shared state between source subtasks?
Sharing state between checkpoints of a specific subtask should be fine.

Sharing state between subtasks of a task can be an issue, no matter whether
it's a source. That's also what I was afraid of in the previous replies. In
one word, if the behavior of a pipeline region can somehow influence the
state of other pipeline regions, their checkpoints have to be aligned
before rescaling.

On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yuanmei.w...@gmail.com> wrote:

> Hey Folks,
>
> Thanks for the discussion!
>
> *Motiviation and use cases*
> I think motiviation and use cases are very clear and I do not have doubts
> on this part.
> A typical use case is ETL with two-phase-commit, hundreds of partitions can
> be blocked by a single straggler (a single task's checkpoint abortion can
> affect all, not necessary failure).
>
> *Source offset redistribution*
> As for the known sources & implementation for Flink, I can not find a case
> that does not work, *for now*.
> I need to dig a bit more: how splits are tracked assigned, not successfully
> processed, succesffully processed e.t.c.
> I guess it is a single shared source OPCoordinator. And how this *shared*
> state (between tasks) is preserved?
>
> *Input partition/splits treated completely independent from each other*
> This part I am still not sure, as mentioned if we have shared state of
> source in the above section.
>
> To Thomas:
> > In Yuan's example, is there a reason why CP8 could not be promoted to
> > CP10 by the coordinator for PR2 once it receives the notification that
> > CP10 did not complete? It appears that should be possible since in its
> > effect it should be no different than no data processed between CP8
> >  and CP10?
>
> Not sure what "promoted" means here, but
> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> if no shared state in source, as exactly what you meantinoed,
> "it should be no different than no data processed between CP8 and CP10"
>
> 2. I've noticed that from this question there is a gap between
> "*allow aborted/failed checkpoint in independent sub-graph*" and
> my intention: "*independent sub-graph checkpointing indepently*"
>
> Best
> Yuan
>
>
> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <luogen...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm thinking about Yuan's case. Let's assume that the case is running in
> > current Flink:
> > 1. CP8 finishes
> > 2. For some reason, PR2 stops consuming records from the source (but is
> not
> > stuck), and PR1 continues consuming new records.
> > 3. CP9 and CP10 finish
> > 4. PR2 starts to consume quickly to catch up with PR1, and reaches the
> same
> > final status with that in Yuan's case before CP11 starts.
> >
> > I support that in this case, the status of the job can be the same as in
> > Yuan's case, and the snapshot (including source states) taken at CP10
> > should be the same as the composed global snapshot in Yuan's case, which
> is
> > combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> failed
> > checkpointing nor uncommitted consuming have side effects, both of which
> > can break the exactly-once semantics when replaying. So I think there
> > should be no difference between rescaling the combined global snapshot or
> > the globally taken one, i.e. if the input partitions are not independent,
> > we are probably not able to rescale the source state in the current Flink
> > eiter.
> >
> > And @Thomas, I do agree that the operational burden is
> > significantly reduced, while I'm a little afraid that checkpointing the
> > subgraphs individually may increase most of the runtime overhead back
> > again. Maybe we can find a better way to implement this.
> >
> > On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <t...@apache.org> wrote:
> >
> > > Hi,
> > >
> > > Thanks for opening this discussion! The proposed enhancement would be
> > > interesting for use cases in our infrastructure as well.
> > >
> > > There are scenarios where it makes sense to have multiple disconnected
> > > subgraphs in a single job because it can significantly reduce the
> > > operational burden as well as the runtime overhead. Since we allow
> > > subgraphs to recover independently, then why not allow them to make
> > > progress independently also, which would imply that checkpointing must
> > > succeed for non affected subgraphs as certain behavior is tied to
> > > checkpoint completion, like Kafka offset commit, file output etc.
> > >
> > > As for source offset redistribution, offset/position needs to be tied
> > > to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> > > and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> > > source framework, it would be hard to implement a source with correct
> > > behavior that does not track the position along with the split.
> > >
> > > In Yuan's example, is there a reason why CP8 could not be promoted to
> > > CP10 by the coordinator for PR2 once it receives the notification that
> > > CP10 did not complete? It appears that should be possible since in its
> > > effect it should be no different than no data processed between CP8
> > > and CP10?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <trohrm...@apache.org>
> > wrote:
> > > >
> > > > Thanks for the clarification Yuan and Gen,
> > > >
> > > > I agree that the checkpointing of the sources needs to support the
> > > > rescaling case, otherwise it does not work. Is there currently a
> source
> > > > implementation where this wouldn't work? For Kafka it should work
> > because
> > > > we store the offset per assigned partition. For Kinesis it is
> probably
> > > the
> > > > same. For the Filesource we store the set of unread input splits in
> the
> > > > source coordinator and the state of the assigned splits in the
> sources.
> > > > This should probably also work since new splits are only handed out
> to
> > > > running tasks.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yuanmei.w...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Till,
> > > > >
> > > > > > Why rescaling is a problem for pipelined regions/independent
> > > execution
> > > > > subgraphs:
> > > > >
> > > > > Take a simplified example :
> > > > > job graph : source  (2 instances) -> sink (2 instances)
> > > > > execution graph:
> > > > > source (1/2)  -> sink (1/2)   [pieplined region 1]
> > > > > source (2/2)  -> sink (2/2)   [pieplined region 2]
> > > > >
> > > > > Let's assume checkpoints are still triggered globally, meaning
> > > different
> > > > > pipelined regions share the global checkpoint id (PR1 CP1 matches
> > with
> > > PR2
> > > > > CP1).
> > > > >
> > > > > Now let's assume PR1 completes CP10 and PR2 completes CP8.
> > > > >
> > > > > Let's say we want to rescale to parallelism 3 due to increased
> input.
> > > > >
> > > > > - Notice that we can not simply rescale based on the latest
> completed
> > > > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
> > output
> > > > > externally.
> > > > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
> > > how the
> > > > > source's offset redistribution is implemented.
> > > > >    The answer is yes if we treat each input partition as
> independent
> > > from
> > > > > each other, *but I am not sure whether we can make that
> assumption*.
> > > > >
> > > > > If not, the rescaling cannot happen until PR1 and PR2 are aligned
> > with
> > > CPs.
> > > > >
> > > > > Best
> > > > > -Yuan
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <trohrm...@apache.org
> >
> > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > Yuan and Gen could you elaborate why rescaling is a problem if we
> > say
> > > > > that
> > > > > > separate pipelined regions can take checkpoints independently?
> > > > > > Conceptually, I somehow think that a pipelined region that is
> > failed
> > > and
> > > > > > cannot create a new checkpoint is more or less the same as a
> > > pipelined
> > > > > > region that didn't get new input or a very very slow pipelined
> > region
> > > > > which
> > > > > > couldn't read new records since the last checkpoint (assuming
> that
> > > the
> > > > > > checkpoint coordinator can create a global checkpoint by
> combining
> > > > > > individual checkpoints (e.g. taking the last completed checkpoint
> > > from
> > > > > each
> > > > > > pipelined region)). If this comparison is correct, then this
> would
> > > mean
> > > > > > that we have rescaling problems under the latter two cases.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <luogen...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi Gyula,
> > > > > > >
> > > > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can
> > > discuss
> > > > > > this
> > > > > > > within two scopes. One is the job subgraph, the other is the
> > > execution
> > > > > > > subgraph, which I suppose is the same as PipelineRegion.
> > > > > > >
> > > > > > > An idea is to individually checkpoint the PipelineRegions, for
> > the
> > > > > > > recovering in a single run.
> > > > > > >
> > > > > > > Flink has now supported PipelineRegion based failover, with a
> > > subset
> > > > > of a
> > > > > > > global checkpoint snapshot. The checkpoint barriers are spread
> > > within a
> > > > > > > PipelineRegion, so the checkpointing of individual
> > PipelineRegions
> > > is
> > > > > > > actually independent. Since in a single run of a job, the
> > > > > PipelineRegions
> > > > > > > are fixed, we can individually checkpoint separated
> > > PipelineRegions,
> > > > > > > despite what status the other PipelineRegions are, and use a
> > > snapshot
> > > > > of
> > > > > > a
> > > > > > > failing region to recover, instead of the subset of a global
> > > snapshot.
> > > > > > This
> > > > > > > can support separated job subgraphs as well, since they will
> also
> > > be
> > > > > > > separated into different PipelineRegions. I think this can
> > fulfill
> > > your
> > > > > > > needs.
> > > > > > >
> > > > > > > In fact the individual snapshots of all PipelineRegions can
> form
> > a
> > > > > global
> > > > > > > snapshot, and the alignment of snapshots of individual regions
> is
> > > not
> > > > > > > necessary. But rescaling this global snapshot can be
> potentially
> > > > > > complex. I
> > > > > > > think it's better to use the individual snapshots in a single
> > run,
> > > and
> > > > > > take
> > > > > > > a global checkpoint/savepoint before restarting the job,
> > rescaling
> > > it
> > > > > or
> > > > > > > not.
> > > > > > >
> > > > > > > A major issue of this plan is that it breaks the checkpoint
> > > mechanism
> > > > > of
> > > > > > > Flink. As far as I know, even in the approximate recovery, the
> > > snapshot
> > > > > > > used to recover a single task is still a part of a global
> > > snapshot. To
> > > > > > > implement the individual checkpointing of PipelineRegions,
> there
> > > may
> > > > > need
> > > > > > > to be a checkpoint coordinator for each PipelineRegion, and a
> new
> > > > > global
> > > > > > > checkpoint coordinator. When the scale goes up, there can be
> many
> > > > > > > individual regions, which can be a big burden to the job
> manager.
> > > The
> > > > > > > meaning of the checkpoint id will also be changed, which can
> > affect
> > > > > many
> > > > > > > aspects. There can be lots of work and risks, and the risks
> still
> > > exist
> > > > > > if
> > > > > > > we only individually checkpoint separated job subgraphs, since
> > the
> > > > > > > mechanism is still broken. If that is what you need, maybe
> > > separating
> > > > > > them
> > > > > > > into different jobs is an easier and better choice, as Caizhi
> and
> > > Yuan
> > > > > > > mentioned.
> > > > > > >
> > > > > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
> yuanmei.w...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hey Gyula,
> > > > > > > >
> > > > > > > > That's a very interesting idea. The discussion about the
> > > `Individual`
> > > > > > vs
> > > > > > > > `Global` checkpoint was raised before, but the main concern
> was
> > > from
> > > > > > two
> > > > > > > > aspects:
> > > > > > > >
> > > > > > > > - Non-deterministic replaying may lead to an inconsistent
> view
> > of
> > > > > > > > checkpoint
> > > > > > > > - It is not easy to form a clear cut of past and future and
> > > hence no
> > > > > > > clear
> > > > > > > > cut of where the start point of the source should begin to
> > replay
> > > > > from.
> > > > > > > >
> > > > > > > > Starting from independent subgraphs as you proposed may be a
> > good
> > > > > > > starting
> > > > > > > > point. However, when we talk about subgraph, do we mention it
> > as
> > > a
> > > > > job
> > > > > > > > subgraph (each vertex is one or more operators) or execution
> > > subgraph
> > > > > > > (each
> > > > > > > > vertex is a task instance)?
> > > > > > > >
> > > > > > > > If it is a job subgraph, then indeed, why not separate it
> into
> > > > > multiple
> > > > > > > > jobs as Caizhi mentioned.
> > > > > > > > If it is an execution subgraph, then it is difficult to
> handle
> > > > > > rescaling
> > > > > > > > due to inconsistent views of checkpoints between tasks of the
> > > same
> > > > > > > > operator.
> > > > > > > >
> > > > > > > > `Individual/Subgraph Checkpointing` is definitely an
> > interesting
> > > > > > > direction
> > > > > > > > to think of, and I'd love to hear more from you!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Yuan
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> > > tsreape...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Gyula!
> > > > > > > > >
> > > > > > > > > Thanks for raising this discussion. I agree that this will
> be
> > > an
> > > > > > > > > interesting feature but I actually have some doubts about
> the
> > > > > > > motivation
> > > > > > > > > and use case. If there are multiple individual subgraphs in
> > the
> > > > > same
> > > > > > > job,
> > > > > > > > > why not just distribute them to multiple jobs so that each
> > job
> > > > > > contains
> > > > > > > > > only one individual graph and can now fail without
> disturbing
> > > the
> > > > > > > others?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Gyula Fóra <gyf...@apache.org> 于2022年2月7日周一 05:22写道:
> > > > > > > > >
> > > > > > > > > > Hi all!
> > > > > > > > > >
> > > > > > > > > > At the moment checkpointing only works for healthy jobs
> > with
> > > all
> > > > > > > > running
> > > > > > > > > > (or some finished) tasks. This sounds reasonable in most
> > > cases
> > > > > but
> > > > > > > > there
> > > > > > > > > > are a few applications where it would make sense to
> > > checkpoint
> > > > > > > failing
> > > > > > > > > jobs
> > > > > > > > > > as well.
> > > > > > > > > >
> > > > > > > > > > Due to how the checkpointing mechanism works, subgraphs
> > that
> > > > > have a
> > > > > > > > > failing
> > > > > > > > > > task cannot be checkpointed without violating the
> > > exactly-once
> > > > > > > > semantics.
> > > > > > > > > > However if the job has multiple independent subgraphs
> (that
> > > are
> > > > > not
> > > > > > > > > > connected to each other), even if one subgraph is
> failing,
> > > the
> > > > > > other
> > > > > > > > > > completely running one could be checkpointed. In these
> > cases
> > > the
> > > > > > > tasks
> > > > > > > > of
> > > > > > > > > > the failing subgraph could simply inherit the last
> > successful
> > > > > > > > checkpoint
> > > > > > > > > > metadata (before they started failing). This logic would
> > > produce
> > > > > a
> > > > > > > > > > consistent checkpoint.
> > > > > > > > > >
> > > > > > > > > > The job as a whole could now make stateful progress even
> if
> > > some
> > > > > > > > > subgraphs
> > > > > > > > > > are constantly failing. This can be very valuable if for
> > some
> > > > > > reason
> > > > > > > > the
> > > > > > > > > > job has a larger number of independent subgraphs that are
> > > > > expected
> > > > > > to
> > > > > > > > > fail
> > > > > > > > > > every once in a while, or if some subgraphs can have
> longer
> > > > > > downtimes
> > > > > > > > > that
> > > > > > > > > > would now cause the whole job to stall.
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Gyula
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
>

Reply via email to