Hi,

Thanks for coming up with the proposal, it's definitely valuable. I'm still
reading and trying to understand the proposal, but a couple of comments
from my side.

(1)
> Ownership of a single checkpoint file is transferred to TM, while JM
manages the parent directory of these files.

Have you seen https://issues.apache.org/jira/browse/FLINK-24611 before? I
don't fully remember why, but we have rejected the idea of moving the file
ownership to TM and instead reworked the shared file registry in a way that
I think should be sufficient for file sharing. Could you elaborate why we
need to move the file ownership to TM, and why is the current mechanism not
sufficient?

(2)
> File visibility is needed when a Flink job recovers after a checkpoint is
materialized. In some DFS, such as most object storages, a file is only
visible after it is closed

Is that really the case?
`org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems to be
addressing exactly this issue, and the most frequently used FileSystem (S3)
AFAIK supports it with no problems?

(3)
> 4.1.2 Merge files within a subtask or a TM
> Given that TMs are reassigned after restoration, it is difficult to
manage physical files that contain data from multiple subtasks scattered
across different TMs (as depicted in Fig.3). There is no synchronization
mechanism between TMs, making file management in this scenario challenging.

I think this is solved in many places already via the shared state managed
by the JM, as I mentioned in (1).


If I understand it correctly you are proposing to have a common
interface/way of merging small files, in all use cases, that would work
only across a single subtask? That's contrary to what's currently done for
unaligned checkpoints, right? But if this generic mechanism was to be used
for unaligned checkpoints, unaligned checkpoint state would have been
merged with the operators state file, so all in all there would be no
regression visible to a user? The limit is that we always have at least a
single file per subtask, but in exchange we are getting a simpler threading
model?

Bets,
Piotrek

wt., 4 kwi 2023 o 08:51 Zakelly Lan <zakelly....@gmail.com> napisał(a):

> Hi Yanfei,
>
> Thank you for your prompt response.
>
> I agree that managing (deleting) only some folders with JM can greatly
> relieve JM's burden. Thanks for pointing this out.
>
> In general, merging at the TM level is more effective since there are
> usually more files to merge. Therefore, I believe it is better to
> merge files per TM as much as possible.  However, for shared state,
> merging at the subtask level is the best choice to prevent significant
> data transfer over the network after restoring. I think it is better
> to keep these merging granularities for different types of files as
> presets that are not configurable. WDYT?
>
> As for the DSTL files, they are merged per TM and placed in the
> task-owned folder. These files can be classified as shared state since
> they are shared across checkpoints. However, the DSTL file is a
> special case that will be subsumed by the first checkpoint of the
> newly restored job. Therefore, there is no need for new TMs to keep
> these files after the old checkpoint is subsumed, just like the
> private state files. Thus, it is feasible to merge DSTL files per TM
> without introducing complex file management across job attempts. So
> the possible performance degradation is avoided.
>
> The three newly introduced options have recommended defaults. For
> upcoming versions, this feature is turned off by default. For the
> second option, SEGMENTED_ACROSS_CP_BOUNDARY is the recommended default
> as it is more effective. Of course, if encountering some DFS that does
> not support file visibility until the file is closed, it is possible
> to fall back to another option automatically. For the third option,
> 64MB is an acceptable target size. The RocksDB state backend in Flink
> also chooses 64MB as the default target file size.
>
>
> Thank you again for your quick response.
>
>
> Best regards,
> Zakelly
>
>
> On Mon, Apr 3, 2023 at 11:27 PM Yanfei Lei <fredia...@gmail.com> wrote:
> >
> > Hi Zakelly,
> >
> > Thanks for driving this,  this proposal enables the files merging of
> > different types of states to be grouped under a unified framework. I
> > think it has the added benefit of lightening the load on JM. As
> > FLINK-26590[1] described,  triggered checkpoints can be delayed by
> > discarding shared state when JM manages a large number of files. After
> > this FLIP, JM only needs to manage some folders, which greatly reduces
> > the burden on JM.
> >
> > In Section 4.1, two types of merging granularities(per subtask and per
> > task manager) are proposed, the shared state is managed by per subtask
> > granularity, but for the changelog state backend, its DSTL files are
> > shared between checkpoints, and are currently merged in batches at the
> > task manager level. When merging with the SEGMENTED_WITHIN_CP_BOUNDARY
> > mode, I'm concerned about the performance degradation of its merging,
> > hence I wonder if the merge granularities are configurable? Further,
> > from a user perspective, three new options are introduced in this
> > FLIP, do they have recommended defaults?
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-26590
> >
> > Best,
> > Yanfei
> >
> > Zakelly Lan <zakelly....@gmail.com> 于2023年4月3日周一 18:36写道:
> >
> > >
> > > Hi everyone,
> > >
> > > I would like to open a discussion on providing a unified file merging
> > > mechanism for checkpoints[1].
> > >
> > > Currently, many files are uploaded to the DFS during checkpoints,
> > > leading to the 'file flood' problem when running
> > > intensive workloads in a cluster.  To tackle this problem, various
> > > solutions have been proposed for different types
> > > of state files. Although these methods are similar, they lack a
> > > systematic view and approach. We believe that it is
> > > better to consider this problem as a whole and introduce a unified
> > > framework to address the file flood problem for
> > > all types of state files. A POC has been implemented based on current
> > > FLIP design, and the test results are promising.
> > >
> > >
> > > Looking forward to your comments or feedback.
> > >
> > > Best regards,
> > > Zakelly
> > >
> > > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
>

Reply via email to