Have created FLINK-17865 [1] to follow this up. Thanks.

Best Regards,
Yu

[1] https://issues.apache.org/jira/browse/FLINK-17865


On Wed, 20 May 2020 at 23:03, Stephan Ewen <se...@apache.org> wrote:

> Sounds good to me.
>
> By the next release, we should have also phased out the old Kafka Source,
> which is one of the most common and most problematic users of Union State
>
> On Wed, May 20, 2020 at 11:12 AM Yu Li <car...@gmail.com> wrote:
>
> > +1 on improving Union State implementation.
> >
> > I think the concerns raised around union state is valid, meanwhile jobs
> > with 200 parallelism on the source operator could be regarded as "large
> > job".
> >
> > To compromise, I suggest we split the improvements of the issue into 3
> > steps:
> >
> > 1. Increase `state.backend.fs.memory-threshold` from 1K to 20K (which
> will
> > at most increase the memory cost on JM side by 200*200*20K=800MB)
> > 2. Improve the union state implementation
> > 3. Further increase `state.backend.fs.memory-threshold` higher
> >
> > What do you think? Thanks.
> >
> > Best Regards,
> > Yu
> >
> >
> > On Sat, 16 May 2020 at 23:15, Yun Tang <myas...@live.com> wrote:
> >
> > > If we cannot get rid of union state, I think we should introduce memory
> > > control on the serialized TDDs when deploying
> > > tasks instead of how union state is implemented when assign state in
> > > StateAssignmentOperation.
> > > The duplicated TaskStateSnapshot would not really increase much memory
> as
> > > the ByteStreamStateHandle's are
> > > actually share the same reference until they are serialized.
> > >
> > > When talking about the estimated memory footprint, I previously think
> > that
> > > depends on the pool size of future executor
> (HardWare#getNumberCPUCores).
> > > However, with the simple program below, I found the async submit task
> > logic
> > > make the number of existing RemoteRpcInvocation in JM at the same time
> > > larger than the HardWare#getNumberCPUCores.
> > > Take below program for example, we have 200 parallelism of source and
> the
> > > existing RemoteRpcInvocation in JM at the same time could be nearly 200
> > > while our pool size of future executor is only 96. I think if we could
> > > clear the serialized data in RemoteRpcInvocation as soon as possible,
> we
> > > might mitigate this problem greatly.
> > >
> > > Simple program which used union state to reproduce the memory footprint
> > > problem: one sub-task of the total union state is 100KB bytes array,
> and
> > > 200 sub-tasks in total could lead to more than 100KB * 200 * 200 =
> 3.8GB
> > > memory for all union state.
> > >
> > > public class Program {
> > >    private static final Logger LOG =
> > > LoggerFactory.getLogger(Program.class);
> > >
> > >    public static void main(String[] args) throws Exception {
> > >       final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >       env.enableCheckpointing(60 * 1000L);
> > >       env.addSource(new MySource()).setParallelism(200).print();
> > >       env.execute("Mock program");
> > >    }
> > >
> > >    private static class MySource extends
> > > RichParallelSourceFunction<Integer> implements CheckpointedFunction {
> > >       private static final ListStateDescriptor<byte[]> stateDescriptor
> =
> > > new ListStateDescriptor<>("list-1", byte[].class);
> > >       private ListState<byte[]> unionListState;
> > >       private volatile boolean running = true;
> > >       @Override
> > >       public void snapshotState(FunctionSnapshotContext context) throws
> > > Exception {
> > >          unionListState.clear();
> > >          byte[] array = new byte[100 * 1024];
> > >          ThreadLocalRandom.current().nextBytes(array);
> > >          unionListState.add(array);
> > >       }
> > >
> > >       @Override
> > >       public void initializeState(FunctionInitializationContext
> context)
> > > throws Exception {
> > >          if (context.isRestored()) {
> > >             unionListState =
> > > context.getOperatorStateStore().getUnionListState(stateDescriptor);
> > >             List<byte[]> collect =
> > > StreamSupport.stream(unionListState.get().spliterator(),
> > > false).collect(Collectors.toList());
> > >             LOG.info("union state Collect size: {}.", collect.size());
> > >          } else {
> > >             unionListState =
> > > context.getOperatorStateStore().getUnionListState(stateDescriptor);
> > >          }
> > >       }
> > >
> > >       @Override
> > >       public void run(SourceContext<Integer> ctx) throws Exception {
> > >          while (running) {
> > >             synchronized (ctx.getCheckpointLock()) {
> > >                ctx.collect(ThreadLocalRandom.current().nextInt());
> > >             }
> > >             Thread.sleep(100);
> > >          }
> > >       }
> > >
> > >       @Override
> > >       public void cancel() {
> > >          running = false;
> > >       }
> > >    }
> > > }
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Stephan Ewen <se...@apache.org>
> > > Sent: Saturday, May 16, 2020 18:56
> > > To: dev <dev@flink.apache.org>
> > > Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski <
> > > pi...@ververica.com>
> > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> from
> > > 1K to 100K
> > >
> > > Okay, thank you for all the feedback.
> > >
> > > So we should definitely work on getting rid of the Union State, or at
> > least
> > > change the way it is implemented (avoid duplicate serializer
> snapshots).
> > >
> > > Can you estimate from which size of the cluster on the JM heap usage
> > > becomes critical (if we increased the threshold to 100k, or maybe 50k)
> ?
> > >
> > >
> > > On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qcx978132...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Overall, I agree with increasing this value. but the default value
> set
> > to
> > > > 100K maybe something too large from my side.
> > > >
> > > > I want to share some more information from my side.
> > > >
> > > > The small files problem is indeed a problem many users may encounter
> in
> > > > production env. The states(Keyed state and Operator state) can become
> > > small
> > > > files in DFS, but increase the value of
> > > `state.backend.fs.memory-threshold`
> > > > may encounter the JM OOM problem as Yun said previously.
> > > > We've tried increase this value in our production env, but some
> > > connectors
> > > > which UnionState prevent us to do this, the memory consumed by these
> > jobs
> > > > can be very large (in our case, thousands of parallelism, set
> > > > `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory
> > for
> > > > JM), so in the end, we use the solution proposed in FLINK-11937[1]
> for
> > > both
> > > > keyed state and operator state.
> > > >
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-11937
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > Yun Tang <myas...@live.com> 于2020年5月15日周五 下午9:09写道:
> > > >
> > > > > Please correct me if I am wrong, "put the increased value into the
> > > > default
> > > > > configuration" means
> > > > > we will update that in default flink-conf.yaml but still leave the
> > > > default
> > > > > value of `state.backend.fs.memory-threshold`as previously?
> > > > > It seems I did not get the point why existing setups with existing
> > > > configs
> > > > > will not be affected.
> > > > >
> > > > > The concern I raised is because one of our large-scale job with
> 1024
> > > > > parallelism source of union state meet the JM OOM problem when we
> > > > increase
> > > > > this value.
> > > > > I think if we introduce memory control when serializing TDD
> > > > asynchronously
> > > > > [1], we could be much more confident to increase this configuration
> > as
> > > > the
> > > > > memory footprint
> > > > > expands at that time by a lot of serialized TDDs.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > >
> > > > > ________________________________
> > > > > From: Stephan Ewen <se...@apache.org>
> > > > > Sent: Friday, May 15, 2020 16:53
> > > > > To: dev <dev@flink.apache.org>
> > > > > Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski <
> > > > > pi...@ververica.com>
> > > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> > > from
> > > > > 1K to 100K
> > > > >
> > > > > I see, thanks for all the input.
> > > > >
> > > > > I agree with Yun Tang that the use of UnionState is problematic and
> > can
> > > > > cause issues in conjunction with this.
> > > > > However, most of the large-scale users I know that also struggle
> with
> > > > > UnionState have also increased this threshold, because with this
> low
> > > > > threshold, they get an excess number of small files and overwhelm
> > their
> > > > > HDFS / S3 / etc.
> > > > >
> > > > > An intermediate solution could be to put the increased value into
> the
> > > > > default configuration. That way, existing setups with existing
> > configs
> > > > will
> > > > > not be affected, but new users / installations will have a simper
> > time.
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, May 14, 2020 at 9:20 PM Yun Tang <myas...@live.com> wrote:
> > > > >
> > > > > > Tend to be not in favor of this proposal as union state is
> somewhat
> > > > > abused
> > > > > > in several popular source connectors (e.g. kafka), and increasing
> > > this
> > > > > > value could lead to JM OOM when sending tdd from JM to TMs with
> > large
> > > > > > parallelism.
> > > > > >
> > > > > > After we collect union state and initialize the map list [1], we
> > > > already
> > > > > > have union state ready to assign. At this time, the memory
> > footprint
> > > > has
> > > > > > not increase too much as the union state which shared across
> tasks
> > > have
> > > > > the
> > > > > > same reference of ByteStreamStateHandle. However, when we send
> tdd
> > > with
> > > > > the
> > > > > > taskRestore to TMs, akka will serialize those
> ByteStreamStateHandle
> > > > > within
> > > > > > tdd to increases the memory footprint. If the source have 1024
> > > > > > parallelisms, and any one of the sub-task would then have
> > 1024*100KB
> > > > size
> > > > > > state handles. The sum of total memory footprint cannot be
> ignored.
> > > > > >
> > > > > > If we plan to increase the default value of
> > > > > > state.backend.fs.memory-threshold, we should first resolve the
> > above
> > > > > case.
> > > > > > In other words, this proposal could be a trade-off, which benefit
> > > > perhaps
> > > > > > 99% users, but might bring harmful effects to 1% user with
> > > large-scale
> > > > > > flink jobs.
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> > > > > >
> > > > > > Best
> > > > > > Yun Tang
> > > > > >
> > > > > >
> > > > > > ________________________________
> > > > > > From: Yu Li <car...@gmail.com>
> > > > > > Sent: Thursday, May 14, 2020 23:51
> > > > > > To: Till Rohrmann <trohrm...@apache.org>
> > > > > > Cc: dev <dev@flink.apache.org>; Piotr Nowojski <
> > pi...@ververica.com>
> > > > > > Subject: Re: [DISCUSS] increase
> "state.backend.fs.memory-threshold"
> > > > from
> > > > > > 1K to 100K
> > > > > >
> > > > > > TL;DR: I have some reservations but tend to be +1 for the
> proposal,
> > > > > > meanwhile suggest we have a more thorough solution in the long
> run.
> > > > > >
> > > > > > Please correct me if I'm wrong, but it seems the root cause of
> the
> > > > issue
> > > > > is
> > > > > > too many small files generated.
> > > > > >
> > > > > > I have some concerns for the case of session cluster [1], as well
> > as
> > > > > > possible issues for users at large scale, otherwise I think
> > > increasing
> > > > > > `state.backend.fs.memory-threshold` to 100K is a good choice,
> based
> > > on
> > > > > the
> > > > > > assumption that a large portion of our users are running small
> jobs
> > > > with
> > > > > > small states.
> > > > > >
> > > > > > OTOH, maybe extending the solution [2] of resolving RocksDB small
> > > file
> > > > > > problem (as proposed by FLINK-11937 [3]) to also support operator
> > > state
> > > > > > could be an alternative? We have already applied the solution in
> > > > > production
> > > > > > for operator state and solved the HDFS NN RPC bottleneck problem
> on
> > > > last
> > > > > > year's Singles' day.
> > > > > >
> > > > > > Best Regards,
> > > > > > Yu
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > > > > > [2]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > > > > > >
> > > > > > [3] https://issues.apache.org/jira/browse/FLINK-11937
> > > > > >
> > > > > >
> > > > > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <
> trohrm...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > I cannot say much about the concrete value but if our users
> have
> > > > > problems
> > > > > > > with the existing default values, then it makes sense to me to
> > > change
> > > > > it.
> > > > > > >
> > > > > > > One thing to check could be whether it is possible to provide a
> > > > > > meaningful
> > > > > > > exception in case that the state size exceeds the frame size.
> At
> > > the
> > > > > > > moment, Flink should fail with a message saying that a rpc
> > message
> > > > > > exceeds
> > > > > > > the maximum frame size. Maybe it is also possible to point the
> > user
> > > > > > towards
> > > > > > > "state.backend.fs.memory-threshold" if the message exceeds the
> > > frame
> > > > > size
> > > > > > > because of too much state.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > >
> > > > > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org
> >
> > > > wrote:
> > > > > > >
> > > > > > >> The parameter "state.backend.fs.memory-threshold" decides
> when a
> > > > state
> > > > > > >> will
> > > > > > >> become a file and when it will be stored inline with the
> > metadata
> > > > (to
> > > > > > >> avoid
> > > > > > >> excessive amounts of small files).
> > > > > > >>
> > > > > > >> By default, this threshold is 1K - so every state above that
> > size
> > > > > > becomes
> > > > > > >> a
> > > > > > >> file. For many cases, this threshold seems to be too low.
> > > > > > >> There is an interesting talk with background on this from
> Scott
> > > > > Kidder:
> > > > > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > > > > > >>
> > > > > > >> I wanted to discuss increasing this to 100K by default.
> > > > > > >>
> > > > > > >> Advantage:
> > > > > > >>   - This should help many users out of the box, which
> otherwise
> > > see
> > > > > > >> checkpointing problems on systems like S3, GCS, etc.
> > > > > > >>
> > > > > > >> Disadvantage:
> > > > > > >>   - For very large jobs, this increases the required heap
> memory
> > > on
> > > > > the
> > > > > > JM
> > > > > > >> side, because more state needs to be kept in-line when
> gathering
> > > the
> > > > > > acks
> > > > > > >> for a pending checkpoint.
> > > > > > >>   - If tasks have a lot of states and each state is roughly at
> > > this
> > > > > > >> threshold, we increase the chance of exceeding the RPC frame
> > size
> > > > and
> > > > > > >> failing the job.
> > > > > > >>
> > > > > > >> What do you think?
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Stephan
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to