Hi Yun,

It looks like this doc needs permission to read? [1]

[1] 
https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#

Best,
Jingsong

On Fri, Apr 7, 2023 at 4:34 PM Piotr Nowojski <pnowoj...@apache.org> wrote:
>
> Hi,
>
> +1 To what Yun Tang wrote. We don't seem to have access to the design doc.
> Could you make it publicly visible or copy out its content to another
> document?
>
> Thanks for your answers Zakelly.
>
> (1)
> Yes, the current mechanism introduced in FLINK-24611 allows for checkpoint
> N, to only re-use shared state handles that have been already referenced by
> checkpoint N-1. But why do we need to break this assumption? In your step,
> "d.", TM could adhere to that assumption, and instead of reusing File-2, it
> could either re-use File-1, File-3 or create a new file.
>
> (2)
> Can you elaborate a bit more on this? As far as I recall, the purpose of
> the `RecoverableWriter` is to support exactly the things described in this
> FLIP, so what's the difference? If you are saying that for this FLIP you
> can implement something more efficiently for a given FileSystem, then why
> can it not be done the same way for the `RecoverableWriter`?
>
> Best,
> Piotrek
>
> czw., 6 kwi 2023 o 17:24 Yun Tang <myas...@live.com> napisał(a):
>
> > Hi Zakelly,
> >
> > Thanks for driving this work!
> >
> > I'm not sure did you ever read the discussion between Stephan, Roman,
> > Piotr, Yuan and I in the design doc [1] in nearly two years ago.
> >
> > From my understanding, your proposal is also a mixed state ownership: some
> > states are owned by the TM while some are owned by the JM. If my memory is
> > correct, we did not take the option-3 or option-5 in the design doc [1] for
> > the code complexity when implements the 1st version of changelog
> > state-backend.
> >
> > Could you also compare the current FLIP with the proposals in the design
> > doc[1]? From my understanding, we should at least consider to comapre with
> > option-3 and option-5 as they are all mixed solutions.
> >
> >
> > [1]
> > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> >
> > Best
> > Yun Tang
> >
> > ------------------------------
> > *From:* Zakelly Lan <zakelly....@gmail.com>
> > *Sent:* Thursday, April 6, 2023 16:38
> > *To:* dev@flink.apache.org <dev@flink.apache.org>
> > *Subject:* Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for
> > Checkpoints
> >
> > Hi Piotr,
> >
> > Thanks for all the feedback.
> >
> > (1) Thanks for the reminder. I have just seen the FLINK-24611, the delayed
> > deletion by JM resolves some sync problems between JM and TM, but I'm
> > afraid it is still not feasible for the file sharing in this FLIP.
> > Considering a concurrent checkpoint scenario as follows:
> >    a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst are written in file 1,
> > and 4.sst is written in file 2.
> >    b. Checkpoint 2 starts based on checkpoint 1, including 1.sst, 2.sst
> > and 5.sst.
> >    c. Checkpoint 3 starts based on checkpoint 1, including 1.sst, 2.sst
> > and 5.sst as well.
> >    d. Checkpoint 3 reuses the file 2, TM writes 5.sst on it.
> >    e. Checkpoint 2 creates a new file 3, TM writes 5.sst on it.
> >    f. Checkpoint 2 finishes, checkpoint 1 is subsumed and the file 2 is
> > deleted, while checkpoint 3 still needs file 2.
> >
> > I attached a diagram to describe the scenario.
> > [image: concurrent cp.jpg]
> > The core issue is that this FLIP introduces a mechanism that allows
> > physical files to be potentially used by the next several checkpoints. JM
> > is uncertain whether there will be a TM continuing to write to a specific
> > file. So in this FLIP, TMs take the responsibility to delete the physical
> > files.
> >
> > (2) IIUC, the RecoverableWriter is introduced to persist data in the "in
> > progress" files after each checkpoint, and the implementation may be based
> > on the file sync in some file systems. However, since the sync is a heavy
> > operation for DFS, this FLIP wants to use flush instead of the sync with
> > the best effort. This only fits the case that the DFS is considered
> > reliable. The problems they want to solve are different.
> >
> > (3) Yes, if files are managed by JM via the shared registry, this problem
> > is solved. And as I mentioned in (1), there are some other corner cases
> > hard to resolve via the shared registry.
> >
> > The goal of this FLIP is to have a common way of merging files in all use
> > cases. For shared state it merges at subtask level, while for private state
> > (and changelog files, as I replied to Yanfei), files are merged at TM
> > level. So it is not contrary to the current plan for the unaligned
> > checkpoint state (FLINK-26803). You are right that the unaligned checkpoint
> > state would be merged with the operator's state file, so overall, it is
> > slightly better than what's currently done.
> >
> >
> > Thanks again for the valuable comments!
> >
> > Best regards,
> > Zakelly
> >
> >
> >
> > On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > Hi,
> >
> > Thanks for coming up with the proposal, it's definitely valuable. I'm still
> > reading and trying to understand the proposal, but a couple of comments
> > from my side.
> >
> > (1)
> > > Ownership of a single checkpoint file is transferred to TM, while JM
> > manages the parent directory of these files.
> >
> > Have you seen https://issues.apache.org/jira/browse/FLINK-24611 before? I
> > don't fully remember why, but we have rejected the idea of moving the file
> > ownership to TM and instead reworked the shared file registry in a way that
> > I think should be sufficient for file sharing. Could you elaborate why we
> > need to move the file ownership to TM, and why is the current mechanism not
> > sufficient?
> >
> > (2)
> > > File visibility is needed when a Flink job recovers after a checkpoint is
> > materialized. In some DFS, such as most object storages, a file is only
> > visible after it is closed
> >
> > Is that really the case?
> > `org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems to be
> > addressing exactly this issue, and the most frequently used FileSystem (S3)
> > AFAIK supports it with no problems?
> >
> > (3)
> > > 4.1.2 Merge files within a subtask or a TM
> > > Given that TMs are reassigned after restoration, it is difficult to
> > manage physical files that contain data from multiple subtasks scattered
> > across different TMs (as depicted in Fig.3). There is no synchronization
> > mechanism between TMs, making file management in this scenario challenging.
> >
> > I think this is solved in many places already via the shared state managed
> > by the JM, as I mentioned in (1).
> >
> >
> > If I understand it correctly you are proposing to have a common
> > interface/way of merging small files, in all use cases, that would work
> > only across a single subtask? That's contrary to what's currently done for
> > unaligned checkpoints, right? But if this generic mechanism was to be used
> > for unaligned checkpoints, unaligned checkpoint state would have been
> > merged with the operators state file, so all in all there would be no
> > regression visible to a user? The limit is that we always have at least a
> > single file per subtask, but in exchange we are getting a simpler threading
> > model?
> >
> > Bets,
> > Piotrek
> >
> > wt., 4 kwi 2023 o 08:51 Zakelly Lan <zakelly....@gmail.com> napisał(a):
> >
> > > Hi Yanfei,
> > >
> > > Thank you for your prompt response.
> > >
> > > I agree that managing (deleting) only some folders with JM can greatly
> > > relieve JM's burden. Thanks for pointing this out.
> > >
> > > In general, merging at the TM level is more effective since there are
> > > usually more files to merge. Therefore, I believe it is better to
> > > merge files per TM as much as possible.  However, for shared state,
> > > merging at the subtask level is the best choice to prevent significant
> > > data transfer over the network after restoring. I think it is better
> > > to keep these merging granularities for different types of files as
> > > presets that are not configurable. WDYT?
> > >
> > > As for the DSTL files, they are merged per TM and placed in the
> > > task-owned folder. These files can be classified as shared state since
> > > they are shared across checkpoints. However, the DSTL file is a
> > > special case that will be subsumed by the first checkpoint of the
> > > newly restored job. Therefore, there is no need for new TMs to keep
> > > these files after the old checkpoint is subsumed, just like the
> > > private state files. Thus, it is feasible to merge DSTL files per TM
> > > without introducing complex file management across job attempts. So
> > > the possible performance degradation is avoided.
> > >
> > > The three newly introduced options have recommended defaults. For
> > > upcoming versions, this feature is turned off by default. For the
> > > second option, SEGMENTED_ACROSS_CP_BOUNDARY is the recommended default
> > > as it is more effective. Of course, if encountering some DFS that does
> > > not support file visibility until the file is closed, it is possible
> > > to fall back to another option automatically. For the third option,
> > > 64MB is an acceptable target size. The RocksDB state backend in Flink
> > > also chooses 64MB as the default target file size.
> > >
> > >
> > > Thank you again for your quick response.
> > >
> > >
> > > Best regards,
> > > Zakelly
> > >
> > >
> > > On Mon, Apr 3, 2023 at 11:27 PM Yanfei Lei <fredia...@gmail.com> wrote:
> > > >
> > > > Hi Zakelly,
> > > >
> > > > Thanks for driving this,  this proposal enables the files merging of
> > > > different types of states to be grouped under a unified framework. I
> > > > think it has the added benefit of lightening the load on JM. As
> > > > FLINK-26590[1] described,  triggered checkpoints can be delayed by
> > > > discarding shared state when JM manages a large number of files. After
> > > > this FLIP, JM only needs to manage some folders, which greatly reduces
> > > > the burden on JM.
> > > >
> > > > In Section 4.1, two types of merging granularities(per subtask and per
> > > > task manager) are proposed, the shared state is managed by per subtask
> > > > granularity, but for the changelog state backend, its DSTL files are
> > > > shared between checkpoints, and are currently merged in batches at the
> > > > task manager level. When merging with the SEGMENTED_WITHIN_CP_BOUNDARY
> > > > mode, I'm concerned about the performance degradation of its merging,
> > > > hence I wonder if the merge granularities are configurable? Further,
> > > > from a user perspective, three new options are introduced in this
> > > > FLIP, do they have recommended defaults?
> > > >
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-26590
> > > >
> > > > Best,
> > > > Yanfei
> > > >
> > > > Zakelly Lan <zakelly....@gmail.com> 于2023年4月3日周一 18:36写道:
> > > >
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > I would like to open a discussion on providing a unified file merging
> > > > > mechanism for checkpoints[1].
> > > > >
> > > > > Currently, many files are uploaded to the DFS during checkpoints,
> > > > > leading to the 'file flood' problem when running
> > > > > intensive workloads in a cluster.  To tackle this problem, various
> > > > > solutions have been proposed for different types
> > > > > of state files. Although these methods are similar, they lack a
> > > > > systematic view and approach. We believe that it is
> > > > > better to consider this problem as a whole and introduce a unified
> > > > > framework to address the file flood problem for
> > > > > all types of state files. A POC has been implemented based on current
> > > > > FLIP design, and the test results are promising.
> > > > >
> > > > >
> > > > > Looking forward to your comments or feedback.
> > > > >
> > > > > Best regards,
> > > > > Zakelly
> > > > >
> > > > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> > >
> >
> >

Reply via email to