Hi,

I'm thinking about Yuan's case. Let's assume that the case is running in
current Flink:
1. CP8 finishes
2. For some reason, PR2 stops consuming records from the source (but is not
stuck), and PR1 continues consuming new records.
3. CP9 and CP10 finish
4. PR2 starts to consume quickly to catch up with PR1, and reaches the same
final status with that in Yuan's case before CP11 starts.

I support that in this case, the status of the job can be the same as in
Yuan's case, and the snapshot (including source states) taken at CP10
should be the same as the composed global snapshot in Yuan's case, which is
combining CP10 of PR1 and CP8 of PR2. This should be true if neither failed
checkpointing nor uncommitted consuming have side effects, both of which
can break the exactly-once semantics when replaying. So I think there
should be no difference between rescaling the combined global snapshot or
the globally taken one, i.e. if the input partitions are not independent,
we are probably not able to rescale the source state in the current Flink
eiter.

And @Thomas, I do agree that the operational burden is
significantly reduced, while I'm a little afraid that checkpointing the
subgraphs individually may increase most of the runtime overhead back
again. Maybe we can find a better way to implement this.

On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <t...@apache.org> wrote:

> Hi,
>
> Thanks for opening this discussion! The proposed enhancement would be
> interesting for use cases in our infrastructure as well.
>
> There are scenarios where it makes sense to have multiple disconnected
> subgraphs in a single job because it can significantly reduce the
> operational burden as well as the runtime overhead. Since we allow
> subgraphs to recover independently, then why not allow them to make
> progress independently also, which would imply that checkpointing must
> succeed for non affected subgraphs as certain behavior is tied to
> checkpoint completion, like Kafka offset commit, file output etc.
>
> As for source offset redistribution, offset/position needs to be tied
> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> source framework, it would be hard to implement a source with correct
> behavior that does not track the position along with the split.
>
> In Yuan's example, is there a reason why CP8 could not be promoted to
> CP10 by the coordinator for PR2 once it receives the notification that
> CP10 did not complete? It appears that should be possible since in its
> effect it should be no different than no data processed between CP8
> and CP10?
>
> Thanks,
> Thomas
>
> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <trohrm...@apache.org> wrote:
> >
> > Thanks for the clarification Yuan and Gen,
> >
> > I agree that the checkpointing of the sources needs to support the
> > rescaling case, otherwise it does not work. Is there currently a source
> > implementation where this wouldn't work? For Kafka it should work because
> > we store the offset per assigned partition. For Kinesis it is probably
> the
> > same. For the Filesource we store the set of unread input splits in the
> > source coordinator and the state of the assigned splits in the sources.
> > This should probably also work since new splits are only handed out to
> > running tasks.
> >
> > Cheers,
> > Till
> >
> > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yuanmei.w...@gmail.com> wrote:
> >
> > > Hey Till,
> > >
> > > > Why rescaling is a problem for pipelined regions/independent
> execution
> > > subgraphs:
> > >
> > > Take a simplified example :
> > > job graph : source  (2 instances) -> sink (2 instances)
> > > execution graph:
> > > source (1/2)  -> sink (1/2)   [pieplined region 1]
> > > source (2/2)  -> sink (2/2)   [pieplined region 2]
> > >
> > > Let's assume checkpoints are still triggered globally, meaning
> different
> > > pipelined regions share the global checkpoint id (PR1 CP1 matches with
> PR2
> > > CP1).
> > >
> > > Now let's assume PR1 completes CP10 and PR2 completes CP8.
> > >
> > > Let's say we want to rescale to parallelism 3 due to increased input.
> > >
> > > - Notice that we can not simply rescale based on the latest completed
> > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
> > > externally.
> > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
> how the
> > > source's offset redistribution is implemented.
> > >    The answer is yes if we treat each input partition as independent
> from
> > > each other, *but I am not sure whether we can make that assumption*.
> > >
> > > If not, the rescaling cannot happen until PR1 and PR2 are aligned with
> CPs.
> > >
> > > Best
> > > -Yuan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Yuan and Gen could you elaborate why rescaling is a problem if we say
> > > that
> > > > separate pipelined regions can take checkpoints independently?
> > > > Conceptually, I somehow think that a pipelined region that is failed
> and
> > > > cannot create a new checkpoint is more or less the same as a
> pipelined
> > > > region that didn't get new input or a very very slow pipelined region
> > > which
> > > > couldn't read new records since the last checkpoint (assuming that
> the
> > > > checkpoint coordinator can create a global checkpoint by combining
> > > > individual checkpoints (e.g. taking the last completed checkpoint
> from
> > > each
> > > > pipelined region)). If this comparison is correct, then this would
> mean
> > > > that we have rescaling problems under the latter two cases.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <luogen...@gmail.com> wrote:
> > > >
> > > > > Hi Gyula,
> > > > >
> > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can
> discuss
> > > > this
> > > > > within two scopes. One is the job subgraph, the other is the
> execution
> > > > > subgraph, which I suppose is the same as PipelineRegion.
> > > > >
> > > > > An idea is to individually checkpoint the PipelineRegions, for the
> > > > > recovering in a single run.
> > > > >
> > > > > Flink has now supported PipelineRegion based failover, with a
> subset
> > > of a
> > > > > global checkpoint snapshot. The checkpoint barriers are spread
> within a
> > > > > PipelineRegion, so the checkpointing of individual PipelineRegions
> is
> > > > > actually independent. Since in a single run of a job, the
> > > PipelineRegions
> > > > > are fixed, we can individually checkpoint separated
> PipelineRegions,
> > > > > despite what status the other PipelineRegions are, and use a
> snapshot
> > > of
> > > > a
> > > > > failing region to recover, instead of the subset of a global
> snapshot.
> > > > This
> > > > > can support separated job subgraphs as well, since they will also
> be
> > > > > separated into different PipelineRegions. I think this can fulfill
> your
> > > > > needs.
> > > > >
> > > > > In fact the individual snapshots of all PipelineRegions can form a
> > > global
> > > > > snapshot, and the alignment of snapshots of individual regions is
> not
> > > > > necessary. But rescaling this global snapshot can be potentially
> > > > complex. I
> > > > > think it's better to use the individual snapshots in a single run,
> and
> > > > take
> > > > > a global checkpoint/savepoint before restarting the job, rescaling
> it
> > > or
> > > > > not.
> > > > >
> > > > > A major issue of this plan is that it breaks the checkpoint
> mechanism
> > > of
> > > > > Flink. As far as I know, even in the approximate recovery, the
> snapshot
> > > > > used to recover a single task is still a part of a global
> snapshot. To
> > > > > implement the individual checkpointing of PipelineRegions, there
> may
> > > need
> > > > > to be a checkpoint coordinator for each PipelineRegion, and a new
> > > global
> > > > > checkpoint coordinator. When the scale goes up, there can be many
> > > > > individual regions, which can be a big burden to the job manager.
> The
> > > > > meaning of the checkpoint id will also be changed, which can affect
> > > many
> > > > > aspects. There can be lots of work and risks, and the risks still
> exist
> > > > if
> > > > > we only individually checkpoint separated job subgraphs, since the
> > > > > mechanism is still broken. If that is what you need, maybe
> separating
> > > > them
> > > > > into different jobs is an easier and better choice, as Caizhi and
> Yuan
> > > > > mentioned.
> > > > >
> > > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yuanmei.w...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hey Gyula,
> > > > > >
> > > > > > That's a very interesting idea. The discussion about the
> `Individual`
> > > > vs
> > > > > > `Global` checkpoint was raised before, but the main concern was
> from
> > > > two
> > > > > > aspects:
> > > > > >
> > > > > > - Non-deterministic replaying may lead to an inconsistent view of
> > > > > > checkpoint
> > > > > > - It is not easy to form a clear cut of past and future and
> hence no
> > > > > clear
> > > > > > cut of where the start point of the source should begin to replay
> > > from.
> > > > > >
> > > > > > Starting from independent subgraphs as you proposed may be a good
> > > > > starting
> > > > > > point. However, when we talk about subgraph, do we mention it as
> a
> > > job
> > > > > > subgraph (each vertex is one or more operators) or execution
> subgraph
> > > > > (each
> > > > > > vertex is a task instance)?
> > > > > >
> > > > > > If it is a job subgraph, then indeed, why not separate it into
> > > multiple
> > > > > > jobs as Caizhi mentioned.
> > > > > > If it is an execution subgraph, then it is difficult to handle
> > > > rescaling
> > > > > > due to inconsistent views of checkpoints between tasks of the
> same
> > > > > > operator.
> > > > > >
> > > > > > `Individual/Subgraph Checkpointing` is definitely an interesting
> > > > > direction
> > > > > > to think of, and I'd love to hear more from you!
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Yuan
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> tsreape...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Gyula!
> > > > > > >
> > > > > > > Thanks for raising this discussion. I agree that this will be
> an
> > > > > > > interesting feature but I actually have some doubts about the
> > > > > motivation
> > > > > > > and use case. If there are multiple individual subgraphs in the
> > > same
> > > > > job,
> > > > > > > why not just distribute them to multiple jobs so that each job
> > > > contains
> > > > > > > only one individual graph and can now fail without disturbing
> the
> > > > > others?
> > > > > > >
> > > > > > >
> > > > > > > Gyula Fóra <gyf...@apache.org> 于2022年2月7日周一 05:22写道:
> > > > > > >
> > > > > > > > Hi all!
> > > > > > > >
> > > > > > > > At the moment checkpointing only works for healthy jobs with
> all
> > > > > > running
> > > > > > > > (or some finished) tasks. This sounds reasonable in most
> cases
> > > but
> > > > > > there
> > > > > > > > are a few applications where it would make sense to
> checkpoint
> > > > > failing
> > > > > > > jobs
> > > > > > > > as well.
> > > > > > > >
> > > > > > > > Due to how the checkpointing mechanism works, subgraphs that
> > > have a
> > > > > > > failing
> > > > > > > > task cannot be checkpointed without violating the
> exactly-once
> > > > > > semantics.
> > > > > > > > However if the job has multiple independent subgraphs (that
> are
> > > not
> > > > > > > > connected to each other), even if one subgraph is failing,
> the
> > > > other
> > > > > > > > completely running one could be checkpointed. In these cases
> the
> > > > > tasks
> > > > > > of
> > > > > > > > the failing subgraph could simply inherit the last successful
> > > > > > checkpoint
> > > > > > > > metadata (before they started failing). This logic would
> produce
> > > a
> > > > > > > > consistent checkpoint.
> > > > > > > >
> > > > > > > > The job as a whole could now make stateful progress even if
> some
> > > > > > > subgraphs
> > > > > > > > are constantly failing. This can be very valuable if for some
> > > > reason
> > > > > > the
> > > > > > > > job has a larger number of independent subgraphs that are
> > > expected
> > > > to
> > > > > > > fail
> > > > > > > > every once in a while, or if some subgraphs can have longer
> > > > downtimes
> > > > > > > that
> > > > > > > > would now cause the whole job to stall.
> > > > > > > >
> > > > > > > > What do you think?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Gyula
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Reply via email to