Hey all,

Thanks @Zakelly for driving this effort and thanks everyone for the warm
discussion. Sorry for the late response.

As I and Zakelly have already discussed and reviewed the design carefully
when drafting this FLIP, I do not have additional inputs here. But I want
to highlight several points that I've been quoted and explain why I think
the current design is a reasonable and clean one.

*Why this FLIP is proposed*
File Flooding is a problem for Flink I've seen many people bring up
throughout the years, especially for large clusters. Unfortunately, there
are not yet accepted solutions for the most commonly used state backend
like RocksDB. This FLIP was originally targeted to address merging
SST(KeyedState) checkpoint files.

While we are comparing different design choices, we found that different
types of checkpoint files (OPState, Unaligned CP channel state, Changelog
incremental state) share similar considerations, for example, file
management, file merging granularity, and e.t.c. That's why we want to
abstract a unified framework for merging these different types of
checkpoint files and provide flexibility to choose between merging
efficiency, rescaling/restoring cost, File system capabilities (affecting
File visibility), and e.t.c.

*File Ownership moved from JM to TM, WHY*
One of the major differences in the proposed design is moving file
ownership from JM to TM. A lot of questions/concerns are coming from here,
let me answer them one by one:

*1. Why the current JM SharedRegistry is not enough and do we have to
introduce more complexity?*
SharedRegistry maintains the mapping between *a file -> max CP ID using the
file*
For merging files, we have to introduce another level of mapping *a file ->
checkpoint file segment (merged files)*
So yes, no matter what, the second level of mapping has to be managed
somewhere, either JM or TM.

*2. Why the **complexity (second level of mapping)** cannot be maintained
in JM?*
- As a centralized service, JM has already been complicated and overloaded.
As mentioned by @Yanfei Lei <fredia...@gmail.com>, "triggering checkpoints
can be delayed by discarding shared state when JM manages a large number of
files FLINK-26590". This ends up setting the JM thread pool to 500!
- As explained by @Zakelly in the previous thread, the contract "for
Checkpoint N, only re-use shared state handles that have been already
referenced by checkpoint N-1" is not guaranteed for the concurrent
checkpoint in the current JM-owned design.  This problem can not be
addressed without significant changes in how SharedRegistry and checkpoint
subsume work, which, I do not think is worth it since "concurrent_CP>1" is
not used that much in prod.

*3. We have similar discussions before, moving ownership from JM to TM, why
it is not adopted at that time? *
As mentioned by Yun and Piotr, we have had similar discussions to move
ownership from JM to TM when designing the changelog state backend. The
reason why we stuck to JM ownership at that time is mainly due to
engineering time/effort constraints.
This time, since we need an extra level of mapping, which complicates the
JM logic even further, we indeed need to shade the complexity within the TM
to avoid more communications between JM and TM.
Zakelly has already shared the code branch (about 2000 lines), and it is
simple.

*4. Cloud-Native Trend*
The current centralized file management framework contradicts the
cloud-native trend. That's also one of the reasons moving ownership from JM
to TM was first proposed. The proposed design and implementation is a
worthy try-out in this direction. I'd like to put some more effort in this
direction if this really turns out working well.

One more thing I want to mention is that the proposed design shaded all the
code changes and complexities in the newly introduced File management in
TM. That says without enabling File merging, the code path of File managing
remains the same as before. So it is also a safe change in this sense.

Best,
Yuan



On Wed, Apr 12, 2023 at 5:23 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Yun,
>
> I reorganized our discussion and added a comparison table of state
> ownership with some previous designs. Please take a look at section
> "4.9. State ownership comparison with other designs".
>
> But I don't see them as alternatives since the design of state
> ownership is integrated with this FLIP. That is to say, we are
> providing a file merging solution including file management for new
> merged files, other ownership models are not feasible for the current
> merging plan. If the state ownership changes, the design of merging
> files at different granularities also needs to be changed accordingly.
> WDYT?
>
>
> Best regards,
> Zakelly
>
> On Tue, Apr 11, 2023 at 10:18 PM Yun Tang <myas...@live.com> wrote:
> >
> > Hi Zakelly,
> >
> > Since we already had some discussions on this topic in the doc I
> mentioned, could you please describe the difference in your FLIP?
> >
> > I think we should better have a comparing table across different options
> just like the doc wrote. And we could also list some of them in your
> Rejected Alternatives part.
> >
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Zakelly Lan <zakelly....@gmail.com>
> > Sent: Tuesday, April 11, 2023 17:57
> > To: dev@flink.apache.org <dev@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for
> Checkpoints
> >
> > Hi Rui Fan,
> >
> > Thanks for your comments!
> >
> > > (1) The temporary segment will remain in the physical file for a short
> time, right?
> >
> > Yes, any written segment will remain in the physical file until the
> > physical file is deleted. It is controlled by the reference counting.
> > And as discussed in 4.7, this will result in a space amplification
> > problem.
> >
> >
> > > (2) Is subtask granularity confused with shared state?
> >
> > Merging files at granularity of subtask is a general solution for
> > shared states, considering the file may be reused by the following
> > checkpoint after job restore. This design is applicable to sst files
> > and any other shared states that may arise in the future. However, the
> > DSTL files are a special case of shared states, since these files will
> > no longer be shared after job restore. Therefore, we may do an
> > optimization for these files and merge them at the TM level.
> > Currently, the DSTL files are not in the shared directory of
> > checkpoint storage, and I suggest we keep it as it is. I agree that
> > this may bring in some confusion, and I suggest the FLIP mainly
> > discuss the general situation and list the special situations
> > separately without bringing in new concepts. I will add another
> > paragraph describing the file merging for DSTL files. WDYT?
> >
> >
> > > (3) When rescaling, do all shared files need to be copied?
> >
> > I agree with you that only sst files of the base DB need to be copied
> > (or re-uploaded in the next checkpoint). However, section 4.2
> > simplifies file copying issues (copying all files), following the
> > concept of shared state.
> >
> >
> > > (4) Does the space magnification ratio need a configuration option?
> >
> > Thanks for the reminder, I will add an option in this FLIP.
> >
> >
> > > (5) How many physical files can a TM write at the same checkpoint at
> the same time?
> >
> > This is a very good point. Actually, there is a file reuse pool as
> > section 4.6 described. There could be multiple files within this pool,
> > supporting concurrent writing by multiple writers. I suggest providing
> > two configurations to control the file number:
> >
> >   state.checkpoints.file-merging.max-file-pool-size: Specifies the
> > upper limit of the file pool size.
> >   state.checkpoints.file-merging.max-subtasks-per-file: Specifies the
> > lower limit of the file pool size based on the number of subtasks
> > within each TM.
> >
> > The number of simultaneously open files is controlled by these two
> > options, and the first option takes precedence over the second.
> >
> > WDYT?
> >
> >
> >
> > Thanks a lot for your valuable insight.
> >
> > Best regards,
> > Zakelly
> >
> >
> > On Mon, Apr 10, 2023 at 7:08 PM Rui Fan <1996fan...@gmail.com> wrote:
> > >
> > > Hi all,
> > >
> > > Thanks Zakelly driving this proposal, and thank you all for
> > > the warm discussions. It's really a useful feature.
> > >
> > > I have a few questions about this FLIP.
> > >
> > > (1) The temporary segment will remain in the physical file for
> > > a short time, right?
> > >
> > > FLIP proposes to write segments instead of physical files.
> > > If the physical files are written directly, these temporary files
> > > will be deleted after the checkpoint is aborted. When writing
> > > a segment, how to delete the temporary segment?
> > > Decrement the reference count value by 1?
> > >
> > > (2) Is subtask granularity confused with shared state?
> > >
> > > From the "4.1.2 Merge files within a subtask or a TM" part,
> > > based on the principle of sst files, it is concluded that
> > > "For shared states, files are merged within each subtask."
> > >
> > > I'm not sure whether this conclusion is general or just for sst.
> > > As Yanfei mentioned before:
> > >
> > > > DSTL files are shared between checkpoints, and are
> > > > currently merged in batches at the task manager level.
> > >
> > > DSTL files as the shared state in FLIP-306, however, it
> > > would be better to merge at TM granularity. So, I'm not
> > > sure whether the subtask granularity confused with
> > > shared state?
> > >
> > > And I'm not familiar with DSTL file merging, should
> > > shared state be divided into shared subtask state
> > > and shared TM state?
> > >
> > > (3) When rescaling, do all shared files need to be copied?
> > >
> > > From the "4.2 Rescaling and Physical File Lifecycle" part,
> > > I see a lot of file copying.
> > >
> > > As I understand, only sst files of the baseDB need to be copied.
> > > From the restore code[1], when restoreWithRescaling, flink will
> > > init a base DB instance, read all contents from other temporary
> > > rocksdb instances, and write them into the base DB, and then
> > > the temporary rocksdb instance will be discarded.
> > >
> > > So, I think copying the files of the base rocksdb is enough, and
> > > the files of other rocksdb instances aren't used.
> > >
> > > Or do not copy any files during recovery, upload all sst files
> > > at the first checkpoint.
> > >
> > > (4) Does the space magnification ratio need a configuration option?
> > >
> > > From the step1 of  "4.7 Space amplification" part, I see:
> > >
> > > > Checking whether the space amplification of each file is greater
> than a
> > > preset threshold and collecting files that exceed the threshold for
> > > compaction.
> > >
> > > Should we add a configuration option about the compaction threshold?
> > > I didn't see it at "5. Public interfaces and User Cases" part.
> > >
> > > (5) How many physical files can a TM write at the same
> > > checkpoint at the same time?
> > >
> > > From the "5. Public interfaces and User Cases" part, I see:
> > >
> > > > A configuration option that sets a maximum size limit for physical
> files.
> > >
> > > I guess that each type of state(private or shared state) will only
> > > write one file at the same time at the same checkpoint.
> > > When the file reaches the maximum size, flink will start writing
> > > the next file, right?
> > >
> > > If yes, for shared state, will
> > > "state.backend.rocksdb.checkpoint.transfer.thread.num"
> > > be invalid?
> > >
> > > For private state, a TM may have many tasks (because of slot
> > > sharing, more than 20 tasks may run in a slot), and the
> > > performance of all tasks serially writing files may be poor,
> > > eventually resulting in longer checkpoint time.
> > >
> > > That's why FLINK-26803[2] introduced a configuration option:
> > >
> "execution.checkpointing.unaligned.max-subtasks-per-channel-state-file".
> > > Flink users can set the maximum number of subtasks that
> > > share the same channel state file.
> > >
> > > That's all my questions right now, please correct me if
> > > anything is wrong.
> > >
> > > Anyway, this FLIP is useful for the stability of large-scale
> > > flink production. Looking forward to its completion and
> > > eventual acceptance by the community.
> > >
> > > [1]
> > >
> https://github.com/apache/flink/blob/65710b437318364ec19c0369d038ac2222c10498/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L292
> > > [2] https://issues.apache.org/jira/browse/FLINK-26803
> > >
> > > Best,
> > > Rui Fan
> > >
> > > On Fri, Apr 7, 2023 at 8:42 PM Jing Ge <j...@ververica.com.invalid>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > Jingsong, Yanfei, please check, if you can view the doc. Thanks.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Fri, Apr 7, 2023 at 2:19 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> > > >
> > > > > Hi Yanfei,
> > > > >
> > > > > Thanks for your comments.
> > > > >
> > > > > > Does this result in a larger space amplification? Maybe a more
> > > > > suitable value can be determined through some experimental
> statistics
> > > > > after we implement this feature.
> > > > >
> > > > > Yes, it results in larger space amplification for shared states. I
> > > > > will do more tests and investigation.
> > > > >
> > > > >
> > > > > Thanks.
> > > > >
> > > > > Best regards,
> > > > > Zakelly
> > > > >
> > > > > On Fri, Apr 7, 2023 at 8:15 PM Zakelly Lan <zakelly....@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > Hi @Piotr and @Jingsong Li
> > > > > >
> > > > > > I have read access to the document, but I'm not sure whether the
> owner
> > > > > > of this document wants to make it public. Actually, the doc is
> for
> > > > > > FLINK-23342 and there is a candidate design very similar to this
> FLIP,
> > > > > > but only for the shared state. Like Yun said, the previous
> design is
> > > > > > not taken because of the code complexity, however I think it is
> > > > > > acceptable after implementing the POC[1]. I think we could focus
> on
> > > > > > the current plan, WDTY?
> > > > > >
> > > > > >
> > > > > > [1] POC of this FLIP:
> > > > > >
> > > > >
> > > >
> https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> > > > > >
> > > > > > On Fri, Apr 7, 2023 at 8:13 PM Zakelly Lan <
> zakelly....@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > Hi Piotr,
> > > > > > >
> > > > > > > Thanks for your comments!
> > > > > > >
> > > > > > > (1) Sorry for the misleading, let me make it more clear. It is
> a
> > > > > > > concurrent checkpoint senario. Yes, the assumption you said
> needs to
> > > > > > > be followed, but the state handles here refer to the original
> SST
> > > > > > > files, not the underlying file. In this FLIP when checkpoint N
> and
> > > > N+1
> > > > > > > are running concurrently, they reuse files from checkpoint
> N-1, and
> > > > > > > some of the files may be deleted when checkpoint N completes
> while
> > > > > > > checkpoint N+1 is still writing on it. There is no such
> problem for
> > > > > > > original shared states without file merging because when a
> state
> > > > > > > handle (or sst file here) from checkpoint N-1 is not
> referenced by
> > > > > > > checkpoint N, it will not be referenced by checkpoint N+1. So
> the
> > > > > > > subsumption of sst files from checkpoint N-1 are safe.
> > > > > > > For above example, when reaching step "d.", File 1 reached the
> size
> > > > > > > threshold and will not be used. The Chk-2 and Chk-3 are running
> > > > > > > concurrently, and the File 3 is being written by Chk-2, so it
> can not
> > > > > > > be used by Chk-3 (As described in section 4.6). Here comes the
> > > > > > > problem.
> > > > > > >
> > > > > > > (2) Please correct me if I'm wrong. The purpose of the
> > > > > > > `RecoverableWriter` is to provide a reliable file writer even
> > > > > > > tolerable with job failure and recovery. The implementation
> varies
> > > > > > > among the file systems, some of which involves writing into
> temporary
> > > > > > > files (such as HDFS). As a result, it may produce more RPC
> requests
> > > > to
> > > > > > > the DFS.
> > > > > > > The goal of this FLIP is to reduce the pressure on DFS,
> especially
> > > > the
> > > > > > > number of files and RPC requests. Currently the TMs are NOT
> using the
> > > > > > > RecoverableWriter to persist/upload the state files, and a file
> > > > > > > closing is enough. The section 4.1.1 is trying to omit this
> file
> > > > > > > closing but ensure file visibility in some DFS, thus reducing
> > > > pressure
> > > > > > > on DFS. That's why I said the problems they want to solve are
> > > > > > > different. I'm not sure if I made myself clear.
> > > > > > >
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Zakelly
> > > > > > >
> > > > > > > On Fri, Apr 7, 2023 at 8:08 PM Zakelly Lan <
> zakelly....@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Yun,
> > > > > > > >
> > > > > > > > Thanks for your suggestions!
> > > > > > > >
> > > > > > > > I have read the FLINK-23342 and its design doc as you
> provided.
> > > > First
> > > > > > > > of all the goal of this FLIP and the doc are similar, and the
> > > > design
> > > > > > > > of this FLIP is pretty much like option 3. The main
> difference is
> > > > > that
> > > > > > > > we imply the concept of 'epoch' in the folder path for each
> > > > > > > > granularity. For shared state, the folder for each subtask
> is like
> > > > > > > > "${checkpointBaseDir}/shared/subtask-{index}-{parallelism}",
> so if
> > > > > the
> > > > > > > > ${checkpointBaseDir} changes (when user restart a job
> manually) or
> > > > > the
> > > > > > > > ${parallelism} changes (when rescaling), there will be a
> > > > > re-uploading,
> > > > > > > > and the JM takes care of the old artifacts. The folder path
> for
> > > > > > > > private state is in the form of
> > > > > > > > "${checkpointBaseDir}/tm-owned/${tmResourceId}" and the
> division of
> > > > > > > > responsibilities between JM and TM is similar. The design of
> this
> > > > > FLIP
> > > > > > > > inherits all the advantages of the design of option 3 in
> that doc,
> > > > > and
> > > > > > > > also avoids extra communication for epoch maintenance. As
> for the
> > > > > code
> > > > > > > > complexity, you may check the POC commit[1] and find that the
> > > > > > > > implementation is pretty clean and is a totally new code path
> > > > making
> > > > > > > > nearly no influence on the old one. Comparing the number of
> lines
> > > > of
> > > > > > > > code change with what's currently done for merging channel
> state[2]
> > > > > > > > (5200 vs. 2500 additions), I think it is acceptable
> considering we
> > > > > are
> > > > > > > > providing a unified file merging framework, which would save
> a lot
> > > > of
> > > > > > > > effort in future. WDYT?
> > > > > > > >
> > > > > > > > Thanks.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Zakelly
> > > > > > > >
> > > > > > > > [1] POC of this FLIP:
> > > > > > > >
> > > > >
> > > >
> https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> > > > > > > > [2] Commit for FLINK-26803 (Merge the channel state files) :
> > > > > > > >
> > > > >
> > > >
> https://github.com/apache/flink/commit/8be94e6663d8ac6e3d74bf4cd5f540cc96c8289e
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Apr 7, 2023 at 7:22 PM Yanfei Lei <
> fredia...@gmail.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Thanks for your explanation Zakelly.
> > > > > > > > > (1) Keeping these merging granularities for different
> types of
> > > > > files
> > > > > > > > > as presets that are not configurable is a good idea to
> prevent
> > > > > > > > > performance degradation.
> > > > > > > > >
> > > > > > > > > (2)
> > > > > > > > > > For the third option, 64MB is an acceptable target size.
> The
> > > > > RocksDB state backend in Flink also chooses 64MB as the default
> target
> > > > file
> > > > > size.
> > > > > > > > >
> > > > > > > > > Does this result in a larger space amplification? Maybe a
> more
> > > > > > > > > suitable value can be determined through some experimental
> > > > > statistics
> > > > > > > > > after we implement this feature.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Yanfei
> > > > > > > > >
> > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年4月7日周五 17:09写道:
> > > > > > > > > >
> > > > > > > > > > Hi Yun,
> > > > > > > > > >
> > > > > > > > > > It looks like this doc needs permission to read? [1]
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > >
> > > >
> https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Jingsong
> > > > > > > > > >
> > > > > > > > > > On Fri, Apr 7, 2023 at 4:34 PM Piotr Nowojski <
> > > > > pnowoj...@apache.org> wrote:
> > > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > +1 To what Yun Tang wrote. We don't seem to have
> access to
> > > > the
> > > > > design doc.
> > > > > > > > > > > Could you make it publicly visible or copy out its
> content to
> > > > > another
> > > > > > > > > > > document?
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your answers Zakelly.
> > > > > > > > > > >
> > > > > > > > > > > (1)
> > > > > > > > > > > Yes, the current mechanism introduced in FLINK-24611
> allows
> > > > > for checkpoint
> > > > > > > > > > > N, to only re-use shared state handles that have been
> already
> > > > > referenced by
> > > > > > > > > > > checkpoint N-1. But why do we need to break this
> assumption?
> > > > > In your step,
> > > > > > > > > > > "d.", TM could adhere to that assumption, and instead
> of
> > > > > reusing File-2, it
> > > > > > > > > > > could either re-use File-1, File-3 or create a new
> file.
> > > > > > > > > > >
> > > > > > > > > > > (2)
> > > > > > > > > > > Can you elaborate a bit more on this? As far as I
> recall, the
> > > > > purpose of
> > > > > > > > > > > the `RecoverableWriter` is to support exactly the
> things
> > > > > described in this
> > > > > > > > > > > FLIP, so what's the difference? If you are saying that
> for
> > > > > this FLIP you
> > > > > > > > > > > can implement something more efficiently for a given
> > > > > FileSystem, then why
> > > > > > > > > > > can it not be done the same way for the
> `RecoverableWriter`?
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Piotrek
> > > > > > > > > > >
> > > > > > > > > > > czw., 6 kwi 2023 o 17:24 Yun Tang <myas...@live.com>
> > > > > napisał(a):
> > > > > > > > > > >
> > > > > > > > > > > > Hi Zakelly,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for driving this work!
> > > > > > > > > > > >
> > > > > > > > > > > > I'm not sure did you ever read the discussion between
> > > > > Stephan, Roman,
> > > > > > > > > > > > Piotr, Yuan and I in the design doc [1] in nearly
> two years
> > > > > ago.
> > > > > > > > > > > >
> > > > > > > > > > > > From my understanding, your proposal is also a mixed
> state
> > > > > ownership: some
> > > > > > > > > > > > states are owned by the TM while some are owned by
> the JM.
> > > > > If my memory is
> > > > > > > > > > > > correct, we did not take the option-3 or option-5 in
> the
> > > > > design doc [1] for
> > > > > > > > > > > > the code complexity when implements the 1st version
> of
> > > > > changelog
> > > > > > > > > > > > state-backend.
> > > > > > > > > > > >
> > > > > > > > > > > > Could you also compare the current FLIP with the
> proposals
> > > > > in the design
> > > > > > > > > > > > doc[1]? From my understanding, we should at least
> consider
> > > > > to comapre with
> > > > > > > > > > > > option-3 and option-5 as they are all mixed
> solutions.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > [1]
> > > > > > > > > > > >
> > > > >
> > > >
> https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > > > > > > > > >
> > > > > > > > > > > > Best
> > > > > > > > > > > > Yun Tang
> > > > > > > > > > > >
> > > > > > > > > > > > ------------------------------
> > > > > > > > > > > > *From:* Zakelly Lan <zakelly....@gmail.com>
> > > > > > > > > > > > *Sent:* Thursday, April 6, 2023 16:38
> > > > > > > > > > > > *To:* dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > > > > > *Subject:* Re: [DISCUSS] FLIP-306: Unified File
> Merging
> > > > > Mechanism for
> > > > > > > > > > > > Checkpoints
> > > > > > > > > > > >
> > > > > > > > > > > > Hi Piotr,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for all the feedback.
> > > > > > > > > > > >
> > > > > > > > > > > > (1) Thanks for the reminder. I have just seen the
> > > > > FLINK-24611, the delayed
> > > > > > > > > > > > deletion by JM resolves some sync problems between
> JM and
> > > > > TM, but I'm
> > > > > > > > > > > > afraid it is still not feasible for the file sharing
> in
> > > > this
> > > > > FLIP.
> > > > > > > > > > > > Considering a concurrent checkpoint scenario as
> follows:
> > > > > > > > > > > >    a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst
> are
> > > > > written in file 1,
> > > > > > > > > > > > and 4.sst is written in file 2.
> > > > > > > > > > > >    b. Checkpoint 2 starts based on checkpoint 1,
> including
> > > > > 1.sst, 2.sst
> > > > > > > > > > > > and 5.sst.
> > > > > > > > > > > >    c. Checkpoint 3 starts based on checkpoint 1,
> including
> > > > > 1.sst, 2.sst
> > > > > > > > > > > > and 5.sst as well.
> > > > > > > > > > > >    d. Checkpoint 3 reuses the file 2, TM writes
> 5.sst on
> > > > it.
> > > > > > > > > > > >    e. Checkpoint 2 creates a new file 3, TM writes
> 5.sst on
> > > > > it.
> > > > > > > > > > > >    f. Checkpoint 2 finishes, checkpoint 1 is
> subsumed and
> > > > > the file 2 is
> > > > > > > > > > > > deleted, while checkpoint 3 still needs file 2.
> > > > > > > > > > > >
> > > > > > > > > > > > I attached a diagram to describe the scenario.
> > > > > > > > > > > > [image: concurrent cp.jpg]
> > > > > > > > > > > > The core issue is that this FLIP introduces a
> mechanism
> > > > that
> > > > > allows
> > > > > > > > > > > > physical files to be potentially used by the next
> several
> > > > > checkpoints. JM
> > > > > > > > > > > > is uncertain whether there will be a TM continuing
> to write
> > > > > to a specific
> > > > > > > > > > > > file. So in this FLIP, TMs take the responsibility to
> > > > delete
> > > > > the physical
> > > > > > > > > > > > files.
> > > > > > > > > > > >
> > > > > > > > > > > > (2) IIUC, the RecoverableWriter is introduced to
> persist
> > > > > data in the "in
> > > > > > > > > > > > progress" files after each checkpoint, and the
> > > > > implementation may be based
> > > > > > > > > > > > on the file sync in some file systems. However,
> since the
> > > > > sync is a heavy
> > > > > > > > > > > > operation for DFS, this FLIP wants to use flush
> instead of
> > > > > the sync with
> > > > > > > > > > > > the best effort. This only fits the case that the
> DFS is
> > > > > considered
> > > > > > > > > > > > reliable. The problems they want to solve are
> different.
> > > > > > > > > > > >
> > > > > > > > > > > > (3) Yes, if files are managed by JM via the shared
> > > > registry,
> > > > > this problem
> > > > > > > > > > > > is solved. And as I mentioned in (1), there are some
> other
> > > > > corner cases
> > > > > > > > > > > > hard to resolve via the shared registry.
> > > > > > > > > > > >
> > > > > > > > > > > > The goal of this FLIP is to have a common way of
> merging
> > > > > files in all use
> > > > > > > > > > > > cases. For shared state it merges at subtask level,
> while
> > > > > for private state
> > > > > > > > > > > > (and changelog files, as I replied to Yanfei), files
> are
> > > > > merged at TM
> > > > > > > > > > > > level. So it is not contrary to the current plan for
> the
> > > > > unaligned
> > > > > > > > > > > > checkpoint state (FLINK-26803). You are right that
> the
> > > > > unaligned checkpoint
> > > > > > > > > > > > state would be merged with the operator's state
> file, so
> > > > > overall, it is
> > > > > > > > > > > > slightly better than what's currently done.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks again for the valuable comments!
> > > > > > > > > > > >
> > > > > > > > > > > > Best regards,
> > > > > > > > > > > > Zakelly
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski <
> > > > > pnowoj...@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for coming up with the proposal, it's
> definitely
> > > > > valuable. I'm still
> > > > > > > > > > > > reading and trying to understand the proposal, but a
> couple
> > > > > of comments
> > > > > > > > > > > > from my side.
> > > > > > > > > > > >
> > > > > > > > > > > > (1)
> > > > > > > > > > > > > Ownership of a single checkpoint file is
> transferred to
> > > > > TM, while JM
> > > > > > > > > > > > manages the parent directory of these files.
> > > > > > > > > > > >
> > > > > > > > > > > > Have you seen
> > > > > https://issues.apache.org/jira/browse/FLINK-24611 before? I
> > > > > > > > > > > > don't fully remember why, but we have rejected the
> idea of
> > > > > moving the file
> > > > > > > > > > > > ownership to TM and instead reworked the shared file
> > > > > registry in a way that
> > > > > > > > > > > > I think should be sufficient for file sharing. Could
> you
> > > > > elaborate why we
> > > > > > > > > > > > need to move the file ownership to TM, and why is the
> > > > > current mechanism not
> > > > > > > > > > > > sufficient?
> > > > > > > > > > > >
> > > > > > > > > > > > (2)
> > > > > > > > > > > > > File visibility is needed when a Flink job
> recovers after
> > > > > a checkpoint is
> > > > > > > > > > > > materialized. In some DFS, such as most object
> storages, a
> > > > > file is only
> > > > > > > > > > > > visible after it is closed
> > > > > > > > > > > >
> > > > > > > > > > > > Is that really the case?
> > > > > > > > > > > >
> > > > > `org.apache.flink.core.fs.FileSystem#createRecoverableWriter`
> seems to be
> > > > > > > > > > > > addressing exactly this issue, and the most
> frequently used
> > > > > FileSystem (S3)
> > > > > > > > > > > > AFAIK supports it with no problems?
> > > > > > > > > > > >
> > > > > > > > > > > > (3)
> > > > > > > > > > > > > 4.1.2 Merge files within a subtask or a TM
> > > > > > > > > > > > > Given that TMs are reassigned after restoration,
> it is
> > > > > difficult to
> > > > > > > > > > > > manage physical files that contain data from multiple
> > > > > subtasks scattered
> > > > > > > > > > > > across different TMs (as depicted in Fig.3). There
> is no
> > > > > synchronization
> > > > > > > > > > > > mechanism between TMs, making file management in this
> > > > > scenario challenging.
> > > > > > > > > > > >
> > > > > > > > > > > > I think this is solved in many places already via the
> > > > shared
> > > > > state managed
> > > > > > > > > > > > by the JM, as I mentioned in (1).
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > If I understand it correctly you are proposing to
> have a
> > > > > common
> > > > > > > > > > > > interface/way of merging small files, in all use
> cases,
> > > > that
> > > > > would work
> > > > > > > > > > > > only across a single subtask? That's contrary to
> what's
> > > > > currently done for
> > > > > > > > > > > > unaligned checkpoints, right? But if this generic
> mechanism
> > > > > was to be used
> > > > > > > > > > > > for unaligned checkpoints, unaligned checkpoint
> state would
> > > > > have been
> > > > > > > > > > > > merged with the operators state file, so all in all
> there
> > > > > would be no
> > > > > > > > > > > > regression visible to a user? The limit is that we
> always
> > > > > have at least a
> > > > > > > > > > > > single file per subtask, but in exchange we are
> getting a
> > > > > simpler threading
> > > > > > > > > > > > model?
> > > > > > > > > > > >
> > > > > > > > > > > > Bets,
> > > > > > > > > > > > Piotrek
> > > > > > > > > > > >
> > > > > > > > > > > > wt., 4 kwi 2023 o 08:51 Zakelly Lan <
> zakelly....@gmail.com
> > > > >
> > > > > napisał(a):
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Yanfei,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you for your prompt response.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I agree that managing (deleting) only some folders
> with
> > > > JM
> > > > > can greatly
> > > > > > > > > > > > > relieve JM's burden. Thanks for pointing this out.
> > > > > > > > > > > > >
> > > > > > > > > > > > > In general, merging at the TM level is more
> effective
> > > > > since there are
> > > > > > > > > > > > > usually more files to merge. Therefore, I believe
> it is
> > > > > better to
> > > > > > > > > > > > > merge files per TM as much as possible.  However,
> for
> > > > > shared state,
> > > > > > > > > > > > > merging at the subtask level is the best choice to
> > > > prevent
> > > > > significant
> > > > > > > > > > > > > data transfer over the network after restoring. I
> think
> > > > it
> > > > > is better
> > > > > > > > > > > > > to keep these merging granularities for different
> types
> > > > of
> > > > > files as
> > > > > > > > > > > > > presets that are not configurable. WDYT?
> > > > > > > > > > > > >
> > > > > > > > > > > > > As for the DSTL files, they are merged per TM and
> placed
> > > > > in the
> > > > > > > > > > > > > task-owned folder. These files can be classified as
> > > > shared
> > > > > state since
> > > > > > > > > > > > > they are shared across checkpoints. However, the
> DSTL
> > > > file
> > > > > is a
> > > > > > > > > > > > > special case that will be subsumed by the first
> > > > checkpoint
> > > > > of the
> > > > > > > > > > > > > newly restored job. Therefore, there is no need
> for new
> > > > > TMs to keep
> > > > > > > > > > > > > these files after the old checkpoint is subsumed,
> just
> > > > > like the
> > > > > > > > > > > > > private state files. Thus, it is feasible to merge
> DSTL
> > > > > files per TM
> > > > > > > > > > > > > without introducing complex file management across
> job
> > > > > attempts. So
> > > > > > > > > > > > > the possible performance degradation is avoided.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The three newly introduced options have recommended
> > > > > defaults. For
> > > > > > > > > > > > > upcoming versions, this feature is turned off by
> default.
> > > > > For the
> > > > > > > > > > > > > second option, SEGMENTED_ACROSS_CP_BOUNDARY is the
> > > > > recommended default
> > > > > > > > > > > > > as it is more effective. Of course, if
> encountering some
> > > > > DFS that does
> > > > > > > > > > > > > not support file visibility until the file is
> closed, it
> > > > > is possible
> > > > > > > > > > > > > to fall back to another option automatically. For
> the
> > > > > third option,
> > > > > > > > > > > > > 64MB is an acceptable target size. The RocksDB
> state
> > > > > backend in Flink
> > > > > > > > > > > > > also chooses 64MB as the default target file size.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you again for your quick response.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Apr 3, 2023 at 11:27 PM Yanfei Lei <
> > > > > fredia...@gmail.com> wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Zakelly,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for driving this,  this proposal enables
> the
> > > > > files merging of
> > > > > > > > > > > > > > different types of states to be grouped under a
> unified
> > > > > framework. I
> > > > > > > > > > > > > > think it has the added benefit of lightening the
> load
> > > > on
> > > > > JM. As
> > > > > > > > > > > > > > FLINK-26590[1] described,  triggered checkpoints
> can be
> > > > > delayed by
> > > > > > > > > > > > > > discarding shared state when JM manages a large
> number
> > > > > of files. After
> > > > > > > > > > > > > > this FLIP, JM only needs to manage some folders,
> which
> > > > > greatly reduces
> > > > > > > > > > > > > > the burden on JM.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > In Section 4.1, two types of merging
> granularities(per
> > > > > subtask and per
> > > > > > > > > > > > > > task manager) are proposed, the shared state is
> managed
> > > > > by per subtask
> > > > > > > > > > > > > > granularity, but for the changelog state
> backend, its
> > > > > DSTL files are
> > > > > > > > > > > > > > shared between checkpoints, and are currently
> merged in
> > > > > batches at the
> > > > > > > > > > > > > > task manager level. When merging with the
> > > > > SEGMENTED_WITHIN_CP_BOUNDARY
> > > > > > > > > > > > > > mode, I'm concerned about the performance
> degradation
> > > > of
> > > > > its merging,
> > > > > > > > > > > > > > hence I wonder if the merge granularities are
> > > > > configurable? Further,
> > > > > > > > > > > > > > from a user perspective, three new options are
> > > > > introduced in this
> > > > > > > > > > > > > > FLIP, do they have recommended defaults?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1]
> https://issues.apache.org/jira/browse/FLINK-26590
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > Yanfei
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Zakelly Lan <zakelly....@gmail.com> 于2023年4月3日周一
> > > > > 18:36写道:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I would like to open a discussion on providing
> a
> > > > > unified file merging
> > > > > > > > > > > > > > > mechanism for checkpoints[1].
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Currently, many files are uploaded to the DFS
> during
> > > > > checkpoints,
> > > > > > > > > > > > > > > leading to the 'file flood' problem when
> running
> > > > > > > > > > > > > > > intensive workloads in a cluster.  To tackle
> this
> > > > > problem, various
> > > > > > > > > > > > > > > solutions have been proposed for different
> types
> > > > > > > > > > > > > > > of state files. Although these methods are
> similar,
> > > > > they lack a
> > > > > > > > > > > > > > > systematic view and approach. We believe that
> it is
> > > > > > > > > > > > > > > better to consider this problem as a whole and
> > > > > introduce a unified
> > > > > > > > > > > > > > > framework to address the file flood problem for
> > > > > > > > > > > > > > > all types of state files. A POC has been
> implemented
> > > > > based on current
> > > > > > > > > > > > > > > FLIP design, and the test results are
> promising.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Looking forward to your comments or feedback.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > >
> > > >
>
>

Reply via email to