Hi, @Yuan Do you mean that there should be no shared state between source subtasks? Sharing state between checkpoints of a specific subtask should be fine.
Sharing state between subtasks of a task can be an issue, no matter whether it's a source. That's also what I was afraid of in the previous replies. In one word, if the behavior of a pipeline region can somehow influence the state of other pipeline regions, their checkpoints have to be aligned before rescaling. On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yuanmei.w...@gmail.com> wrote: > Hey Folks, > > Thanks for the discussion! > > *Motiviation and use cases* > I think motiviation and use cases are very clear and I do not have doubts > on this part. > A typical use case is ETL with two-phase-commit, hundreds of partitions can > be blocked by a single straggler (a single task's checkpoint abortion can > affect all, not necessary failure). > > *Source offset redistribution* > As for the known sources & implementation for Flink, I can not find a case > that does not work, *for now*. > I need to dig a bit more: how splits are tracked assigned, not successfully > processed, succesffully processed e.t.c. > I guess it is a single shared source OPCoordinator. And how this *shared* > state (between tasks) is preserved? > > *Input partition/splits treated completely independent from each other* > This part I am still not sure, as mentioned if we have shared state of > source in the above section. > > To Thomas: > > In Yuan's example, is there a reason why CP8 could not be promoted to > > CP10 by the coordinator for PR2 once it receives the notification that > > CP10 did not complete? It appears that should be possible since in its > > effect it should be no different than no data processed between CP8 > > and CP10? > > Not sure what "promoted" means here, but > 1. I guess it does not matter whether it is CP8 or CP10 any more, > if no shared state in source, as exactly what you meantinoed, > "it should be no different than no data processed between CP8 and CP10" > > 2. I've noticed that from this question there is a gap between > "*allow aborted/failed checkpoint in independent sub-graph*" and > my intention: "*independent sub-graph checkpointing indepently*" > > Best > Yuan > > > On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <luogen...@gmail.com> wrote: > > > Hi, > > > > I'm thinking about Yuan's case. Let's assume that the case is running in > > current Flink: > > 1. CP8 finishes > > 2. For some reason, PR2 stops consuming records from the source (but is > not > > stuck), and PR1 continues consuming new records. > > 3. CP9 and CP10 finish > > 4. PR2 starts to consume quickly to catch up with PR1, and reaches the > same > > final status with that in Yuan's case before CP11 starts. > > > > I support that in this case, the status of the job can be the same as in > > Yuan's case, and the snapshot (including source states) taken at CP10 > > should be the same as the composed global snapshot in Yuan's case, which > is > > combining CP10 of PR1 and CP8 of PR2. This should be true if neither > failed > > checkpointing nor uncommitted consuming have side effects, both of which > > can break the exactly-once semantics when replaying. So I think there > > should be no difference between rescaling the combined global snapshot or > > the globally taken one, i.e. if the input partitions are not independent, > > we are probably not able to rescale the source state in the current Flink > > eiter. > > > > And @Thomas, I do agree that the operational burden is > > significantly reduced, while I'm a little afraid that checkpointing the > > subgraphs individually may increase most of the runtime overhead back > > again. Maybe we can find a better way to implement this. > > > > On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <t...@apache.org> wrote: > > > > > Hi, > > > > > > Thanks for opening this discussion! The proposed enhancement would be > > > interesting for use cases in our infrastructure as well. > > > > > > There are scenarios where it makes sense to have multiple disconnected > > > subgraphs in a single job because it can significantly reduce the > > > operational burden as well as the runtime overhead. Since we allow > > > subgraphs to recover independently, then why not allow them to make > > > progress independently also, which would imply that checkpointing must > > > succeed for non affected subgraphs as certain behavior is tied to > > > checkpoint completion, like Kafka offset commit, file output etc. > > > > > > As for source offset redistribution, offset/position needs to be tied > > > to splits (in FLIP-27) and legacy sources. (It applies to both Kafka > > > and Kinesis legacy sources and FLIP-27 Kafka source.). With the new > > > source framework, it would be hard to implement a source with correct > > > behavior that does not track the position along with the split. > > > > > > In Yuan's example, is there a reason why CP8 could not be promoted to > > > CP10 by the coordinator for PR2 once it receives the notification that > > > CP10 did not complete? It appears that should be possible since in its > > > effect it should be no different than no data processed between CP8 > > > and CP10? > > > > > > Thanks, > > > Thomas > > > > > > On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <trohrm...@apache.org> > > wrote: > > > > > > > > Thanks for the clarification Yuan and Gen, > > > > > > > > I agree that the checkpointing of the sources needs to support the > > > > rescaling case, otherwise it does not work. Is there currently a > source > > > > implementation where this wouldn't work? For Kafka it should work > > because > > > > we store the offset per assigned partition. For Kinesis it is > probably > > > the > > > > same. For the Filesource we store the set of unread input splits in > the > > > > source coordinator and the state of the assigned splits in the > sources. > > > > This should probably also work since new splits are only handed out > to > > > > running tasks. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yuanmei.w...@gmail.com> > > wrote: > > > > > > > > > Hey Till, > > > > > > > > > > > Why rescaling is a problem for pipelined regions/independent > > > execution > > > > > subgraphs: > > > > > > > > > > Take a simplified example : > > > > > job graph : source (2 instances) -> sink (2 instances) > > > > > execution graph: > > > > > source (1/2) -> sink (1/2) [pieplined region 1] > > > > > source (2/2) -> sink (2/2) [pieplined region 2] > > > > > > > > > > Let's assume checkpoints are still triggered globally, meaning > > > different > > > > > pipelined regions share the global checkpoint id (PR1 CP1 matches > > with > > > PR2 > > > > > CP1). > > > > > > > > > > Now let's assume PR1 completes CP10 and PR2 completes CP8. > > > > > > > > > > Let's say we want to rescale to parallelism 3 due to increased > input. > > > > > > > > > > - Notice that we can not simply rescale based on the latest > completed > > > > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) > > output > > > > > externally. > > > > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on > > > how the > > > > > source's offset redistribution is implemented. > > > > > The answer is yes if we treat each input partition as > independent > > > from > > > > > each other, *but I am not sure whether we can make that > assumption*. > > > > > > > > > > If not, the rescaling cannot happen until PR1 and PR2 are aligned > > with > > > CPs. > > > > > > > > > > Best > > > > > -Yuan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <trohrm...@apache.org > > > > > wrote: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > Yuan and Gen could you elaborate why rescaling is a problem if we > > say > > > > > that > > > > > > separate pipelined regions can take checkpoints independently? > > > > > > Conceptually, I somehow think that a pipelined region that is > > failed > > > and > > > > > > cannot create a new checkpoint is more or less the same as a > > > pipelined > > > > > > region that didn't get new input or a very very slow pipelined > > region > > > > > which > > > > > > couldn't read new records since the last checkpoint (assuming > that > > > the > > > > > > checkpoint coordinator can create a global checkpoint by > combining > > > > > > individual checkpoints (e.g. taking the last completed checkpoint > > > from > > > > > each > > > > > > pipelined region)). If this comparison is correct, then this > would > > > mean > > > > > > that we have rescaling problems under the latter two cases. > > > > > > > > > > > > Cheers, > > > > > > Till > > > > > > > > > > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <luogen...@gmail.com> > > wrote: > > > > > > > > > > > > > Hi Gyula, > > > > > > > > > > > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can > > > discuss > > > > > > this > > > > > > > within two scopes. One is the job subgraph, the other is the > > > execution > > > > > > > subgraph, which I suppose is the same as PipelineRegion. > > > > > > > > > > > > > > An idea is to individually checkpoint the PipelineRegions, for > > the > > > > > > > recovering in a single run. > > > > > > > > > > > > > > Flink has now supported PipelineRegion based failover, with a > > > subset > > > > > of a > > > > > > > global checkpoint snapshot. The checkpoint barriers are spread > > > within a > > > > > > > PipelineRegion, so the checkpointing of individual > > PipelineRegions > > > is > > > > > > > actually independent. Since in a single run of a job, the > > > > > PipelineRegions > > > > > > > are fixed, we can individually checkpoint separated > > > PipelineRegions, > > > > > > > despite what status the other PipelineRegions are, and use a > > > snapshot > > > > > of > > > > > > a > > > > > > > failing region to recover, instead of the subset of a global > > > snapshot. > > > > > > This > > > > > > > can support separated job subgraphs as well, since they will > also > > > be > > > > > > > separated into different PipelineRegions. I think this can > > fulfill > > > your > > > > > > > needs. > > > > > > > > > > > > > > In fact the individual snapshots of all PipelineRegions can > form > > a > > > > > global > > > > > > > snapshot, and the alignment of snapshots of individual regions > is > > > not > > > > > > > necessary. But rescaling this global snapshot can be > potentially > > > > > > complex. I > > > > > > > think it's better to use the individual snapshots in a single > > run, > > > and > > > > > > take > > > > > > > a global checkpoint/savepoint before restarting the job, > > rescaling > > > it > > > > > or > > > > > > > not. > > > > > > > > > > > > > > A major issue of this plan is that it breaks the checkpoint > > > mechanism > > > > > of > > > > > > > Flink. As far as I know, even in the approximate recovery, the > > > snapshot > > > > > > > used to recover a single task is still a part of a global > > > snapshot. To > > > > > > > implement the individual checkpointing of PipelineRegions, > there > > > may > > > > > need > > > > > > > to be a checkpoint coordinator for each PipelineRegion, and a > new > > > > > global > > > > > > > checkpoint coordinator. When the scale goes up, there can be > many > > > > > > > individual regions, which can be a big burden to the job > manager. > > > The > > > > > > > meaning of the checkpoint id will also be changed, which can > > affect > > > > > many > > > > > > > aspects. There can be lots of work and risks, and the risks > still > > > exist > > > > > > if > > > > > > > we only individually checkpoint separated job subgraphs, since > > the > > > > > > > mechanism is still broken. If that is what you need, maybe > > > separating > > > > > > them > > > > > > > into different jobs is an easier and better choice, as Caizhi > and > > > Yuan > > > > > > > mentioned. > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei < > yuanmei.w...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > Hey Gyula, > > > > > > > > > > > > > > > > That's a very interesting idea. The discussion about the > > > `Individual` > > > > > > vs > > > > > > > > `Global` checkpoint was raised before, but the main concern > was > > > from > > > > > > two > > > > > > > > aspects: > > > > > > > > > > > > > > > > - Non-deterministic replaying may lead to an inconsistent > view > > of > > > > > > > > checkpoint > > > > > > > > - It is not easy to form a clear cut of past and future and > > > hence no > > > > > > > clear > > > > > > > > cut of where the start point of the source should begin to > > replay > > > > > from. > > > > > > > > > > > > > > > > Starting from independent subgraphs as you proposed may be a > > good > > > > > > > starting > > > > > > > > point. However, when we talk about subgraph, do we mention it > > as > > > a > > > > > job > > > > > > > > subgraph (each vertex is one or more operators) or execution > > > subgraph > > > > > > > (each > > > > > > > > vertex is a task instance)? > > > > > > > > > > > > > > > > If it is a job subgraph, then indeed, why not separate it > into > > > > > multiple > > > > > > > > jobs as Caizhi mentioned. > > > > > > > > If it is an execution subgraph, then it is difficult to > handle > > > > > > rescaling > > > > > > > > due to inconsistent views of checkpoints between tasks of the > > > same > > > > > > > > operator. > > > > > > > > > > > > > > > > `Individual/Subgraph Checkpointing` is definitely an > > interesting > > > > > > > direction > > > > > > > > to think of, and I'd love to hear more from you! > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Yuan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng < > > > tsreape...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Gyula! > > > > > > > > > > > > > > > > > > Thanks for raising this discussion. I agree that this will > be > > > an > > > > > > > > > interesting feature but I actually have some doubts about > the > > > > > > > motivation > > > > > > > > > and use case. If there are multiple individual subgraphs in > > the > > > > > same > > > > > > > job, > > > > > > > > > why not just distribute them to multiple jobs so that each > > job > > > > > > contains > > > > > > > > > only one individual graph and can now fail without > disturbing > > > the > > > > > > > others? > > > > > > > > > > > > > > > > > > > > > > > > > > > Gyula Fóra <gyf...@apache.org> 于2022年2月7日周一 05:22写道: > > > > > > > > > > > > > > > > > > > Hi all! > > > > > > > > > > > > > > > > > > > > At the moment checkpointing only works for healthy jobs > > with > > > all > > > > > > > > running > > > > > > > > > > (or some finished) tasks. This sounds reasonable in most > > > cases > > > > > but > > > > > > > > there > > > > > > > > > > are a few applications where it would make sense to > > > checkpoint > > > > > > > failing > > > > > > > > > jobs > > > > > > > > > > as well. > > > > > > > > > > > > > > > > > > > > Due to how the checkpointing mechanism works, subgraphs > > that > > > > > have a > > > > > > > > > failing > > > > > > > > > > task cannot be checkpointed without violating the > > > exactly-once > > > > > > > > semantics. > > > > > > > > > > However if the job has multiple independent subgraphs > (that > > > are > > > > > not > > > > > > > > > > connected to each other), even if one subgraph is > failing, > > > the > > > > > > other > > > > > > > > > > completely running one could be checkpointed. In these > > cases > > > the > > > > > > > tasks > > > > > > > > of > > > > > > > > > > the failing subgraph could simply inherit the last > > successful > > > > > > > > checkpoint > > > > > > > > > > metadata (before they started failing). This logic would > > > produce > > > > > a > > > > > > > > > > consistent checkpoint. > > > > > > > > > > > > > > > > > > > > The job as a whole could now make stateful progress even > if > > > some > > > > > > > > > subgraphs > > > > > > > > > > are constantly failing. This can be very valuable if for > > some > > > > > > reason > > > > > > > > the > > > > > > > > > > job has a larger number of independent subgraphs that are > > > > > expected > > > > > > to > > > > > > > > > fail > > > > > > > > > > every once in a while, or if some subgraphs can have > longer > > > > > > downtimes > > > > > > > > > that > > > > > > > > > > would now cause the whole job to stall. > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >