Hi,

Overall, I agree with increasing this value. but the default value set to
100K maybe something too large from my side.

I want to share some more information from my side.

The small files problem is indeed a problem many users may encounter in
production env. The states(Keyed state and Operator state) can become small
files in DFS, but increase the value of `state.backend.fs.memory-threshold`
may encounter the JM OOM problem as Yun said previously.
We've tried increase this value in our production env, but some connectors
which UnionState prevent us to do this, the memory consumed by these jobs
can be very large (in our case, thousands of parallelism, set
`state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory for
JM), so in the end, we use the solution proposed in FLINK-11937[1] for both
keyed state and operator state.


[1] https://issues.apache.org/jira/browse/FLINK-11937
Best,
Congxian


Yun Tang <myas...@live.com> 于2020年5月15日周五 下午9:09写道:

> Please correct me if I am wrong, "put the increased value into the default
> configuration" means
> we will update that in default flink-conf.yaml but still leave the default
> value of `state.backend.fs.memory-threshold`as previously?
> It seems I did not get the point why existing setups with existing configs
> will not be affected.
>
> The concern I raised is because one of our large-scale job with 1024
> parallelism source of union state meet the JM OOM problem when we increase
> this value.
> I think if we introduce memory control when serializing TDD asynchronously
> [1], we could be much more confident to increase this configuration as the
> memory footprint
> expands at that time by a lot of serialized TDDs.
>
>
> [1]
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
>
> Best
> Yun Tang
>
> ________________________________
> From: Stephan Ewen <se...@apache.org>
> Sent: Friday, May 15, 2020 16:53
> To: dev <dev@flink.apache.org>
> Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski <
> pi...@ververica.com>
> Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> 1K to 100K
>
> I see, thanks for all the input.
>
> I agree with Yun Tang that the use of UnionState is problematic and can
> cause issues in conjunction with this.
> However, most of the large-scale users I know that also struggle with
> UnionState have also increased this threshold, because with this low
> threshold, they get an excess number of small files and overwhelm their
> HDFS / S3 / etc.
>
> An intermediate solution could be to put the increased value into the
> default configuration. That way, existing setups with existing configs will
> not be affected, but new users / installations will have a simper time.
>
> Best,
> Stephan
>
>
> On Thu, May 14, 2020 at 9:20 PM Yun Tang <myas...@live.com> wrote:
>
> > Tend to be not in favor of this proposal as union state is somewhat
> abused
> > in several popular source connectors (e.g. kafka), and increasing this
> > value could lead to JM OOM when sending tdd from JM to TMs with large
> > parallelism.
> >
> > After we collect union state and initialize the map list [1], we already
> > have union state ready to assign. At this time, the memory footprint has
> > not increase too much as the union state which shared across tasks have
> the
> > same reference of ByteStreamStateHandle. However, when we send tdd with
> the
> > taskRestore to TMs, akka will serialize those ByteStreamStateHandle
> within
> > tdd to increases the memory footprint. If the source have 1024
> > parallelisms, and any one of the sub-task would then have 1024*100KB size
> > state handles. The sum of total memory footprint cannot be ignored.
> >
> > If we plan to increase the default value of
> > state.backend.fs.memory-threshold, we should first resolve the above
> case.
> > In other words, this proposal could be a trade-off, which benefit perhaps
> > 99% users, but might bring harmful effects to 1% user with large-scale
> > flink jobs.
> >
> >
> > [1]
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> >
> > Best
> > Yun Tang
> >
> >
> > ________________________________
> > From: Yu Li <car...@gmail.com>
> > Sent: Thursday, May 14, 2020 23:51
> > To: Till Rohrmann <trohrm...@apache.org>
> > Cc: dev <dev@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> > 1K to 100K
> >
> > TL;DR: I have some reservations but tend to be +1 for the proposal,
> > meanwhile suggest we have a more thorough solution in the long run.
> >
> > Please correct me if I'm wrong, but it seems the root cause of the issue
> is
> > too many small files generated.
> >
> > I have some concerns for the case of session cluster [1], as well as
> > possible issues for users at large scale, otherwise I think increasing
> > `state.backend.fs.memory-threshold` to 100K is a good choice, based on
> the
> > assumption that a large portion of our users are running small jobs with
> > small states.
> >
> > OTOH, maybe extending the solution [2] of resolving RocksDB small file
> > problem (as proposed by FLINK-11937 [3]) to also support operator state
> > could be an alternative? We have already applied the solution in
> production
> > for operator state and solved the HDFS NN RPC bottleneck problem on last
> > year's Singles' day.
> >
> > Best Regards,
> > Yu
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > [2]
> >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > <
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > >
> > [3] https://issues.apache.org/jira/browse/FLINK-11937
> >
> >
> > On Thu, 14 May 2020 at 21:45, Till Rohrmann <trohrm...@apache.org>
> wrote:
> >
> > > I cannot say much about the concrete value but if our users have
> problems
> > > with the existing default values, then it makes sense to me to change
> it.
> > >
> > > One thing to check could be whether it is possible to provide a
> > meaningful
> > > exception in case that the state size exceeds the frame size. At the
> > > moment, Flink should fail with a message saying that a rpc message
> > exceeds
> > > the maximum frame size. Maybe it is also possible to point the user
> > towards
> > > "state.backend.fs.memory-threshold" if the message exceeds the frame
> size
> > > because of too much state.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:
> > >
> > >> The parameter "state.backend.fs.memory-threshold" decides when a state
> > >> will
> > >> become a file and when it will be stored inline with the metadata (to
> > >> avoid
> > >> excessive amounts of small files).
> > >>
> > >> By default, this threshold is 1K - so every state above that size
> > becomes
> > >> a
> > >> file. For many cases, this threshold seems to be too low.
> > >> There is an interesting talk with background on this from Scott
> Kidder:
> > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > >>
> > >> I wanted to discuss increasing this to 100K by default.
> > >>
> > >> Advantage:
> > >>   - This should help many users out of the box, which otherwise see
> > >> checkpointing problems on systems like S3, GCS, etc.
> > >>
> > >> Disadvantage:
> > >>   - For very large jobs, this increases the required heap memory on
> the
> > JM
> > >> side, because more state needs to be kept in-line when gathering the
> > acks
> > >> for a pending checkpoint.
> > >>   - If tasks have a lot of states and each state is roughly at this
> > >> threshold, we increase the chance of exceeding the RPC frame size and
> > >> failing the job.
> > >>
> > >> What do you think?
> > >>
> > >> Best,
> > >> Stephan
> > >>
> > >
> >
>

Reply via email to