Have created FLINK-17865 [1] to follow this up. Thanks. Best Regards, Yu
[1] https://issues.apache.org/jira/browse/FLINK-17865 On Wed, 20 May 2020 at 23:03, Stephan Ewen <se...@apache.org> wrote: > Sounds good to me. > > By the next release, we should have also phased out the old Kafka Source, > which is one of the most common and most problematic users of Union State > > On Wed, May 20, 2020 at 11:12 AM Yu Li <car...@gmail.com> wrote: > > > +1 on improving Union State implementation. > > > > I think the concerns raised around union state is valid, meanwhile jobs > > with 200 parallelism on the source operator could be regarded as "large > > job". > > > > To compromise, I suggest we split the improvements of the issue into 3 > > steps: > > > > 1. Increase `state.backend.fs.memory-threshold` from 1K to 20K (which > will > > at most increase the memory cost on JM side by 200*200*20K=800MB) > > 2. Improve the union state implementation > > 3. Further increase `state.backend.fs.memory-threshold` higher > > > > What do you think? Thanks. > > > > Best Regards, > > Yu > > > > > > On Sat, 16 May 2020 at 23:15, Yun Tang <myas...@live.com> wrote: > > > > > If we cannot get rid of union state, I think we should introduce memory > > > control on the serialized TDDs when deploying > > > tasks instead of how union state is implemented when assign state in > > > StateAssignmentOperation. > > > The duplicated TaskStateSnapshot would not really increase much memory > as > > > the ByteStreamStateHandle's are > > > actually share the same reference until they are serialized. > > > > > > When talking about the estimated memory footprint, I previously think > > that > > > depends on the pool size of future executor > (HardWare#getNumberCPUCores). > > > However, with the simple program below, I found the async submit task > > logic > > > make the number of existing RemoteRpcInvocation in JM at the same time > > > larger than the HardWare#getNumberCPUCores. > > > Take below program for example, we have 200 parallelism of source and > the > > > existing RemoteRpcInvocation in JM at the same time could be nearly 200 > > > while our pool size of future executor is only 96. I think if we could > > > clear the serialized data in RemoteRpcInvocation as soon as possible, > we > > > might mitigate this problem greatly. > > > > > > Simple program which used union state to reproduce the memory footprint > > > problem: one sub-task of the total union state is 100KB bytes array, > and > > > 200 sub-tasks in total could lead to more than 100KB * 200 * 200 = > 3.8GB > > > memory for all union state. > > > > > > public class Program { > > > private static final Logger LOG = > > > LoggerFactory.getLogger(Program.class); > > > > > > public static void main(String[] args) throws Exception { > > > final StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > env.enableCheckpointing(60 * 1000L); > > > env.addSource(new MySource()).setParallelism(200).print(); > > > env.execute("Mock program"); > > > } > > > > > > private static class MySource extends > > > RichParallelSourceFunction<Integer> implements CheckpointedFunction { > > > private static final ListStateDescriptor<byte[]> stateDescriptor > = > > > new ListStateDescriptor<>("list-1", byte[].class); > > > private ListState<byte[]> unionListState; > > > private volatile boolean running = true; > > > @Override > > > public void snapshotState(FunctionSnapshotContext context) throws > > > Exception { > > > unionListState.clear(); > > > byte[] array = new byte[100 * 1024]; > > > ThreadLocalRandom.current().nextBytes(array); > > > unionListState.add(array); > > > } > > > > > > @Override > > > public void initializeState(FunctionInitializationContext > context) > > > throws Exception { > > > if (context.isRestored()) { > > > unionListState = > > > context.getOperatorStateStore().getUnionListState(stateDescriptor); > > > List<byte[]> collect = > > > StreamSupport.stream(unionListState.get().spliterator(), > > > false).collect(Collectors.toList()); > > > LOG.info("union state Collect size: {}.", collect.size()); > > > } else { > > > unionListState = > > > context.getOperatorStateStore().getUnionListState(stateDescriptor); > > > } > > > } > > > > > > @Override > > > public void run(SourceContext<Integer> ctx) throws Exception { > > > while (running) { > > > synchronized (ctx.getCheckpointLock()) { > > > ctx.collect(ThreadLocalRandom.current().nextInt()); > > > } > > > Thread.sleep(100); > > > } > > > } > > > > > > @Override > > > public void cancel() { > > > running = false; > > > } > > > } > > > } > > > > > > Best > > > Yun Tang > > > ________________________________ > > > From: Stephan Ewen <se...@apache.org> > > > Sent: Saturday, May 16, 2020 18:56 > > > To: dev <dev@flink.apache.org> > > > Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski < > > > pi...@ververica.com> > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" > from > > > 1K to 100K > > > > > > Okay, thank you for all the feedback. > > > > > > So we should definitely work on getting rid of the Union State, or at > > least > > > change the way it is implemented (avoid duplicate serializer > snapshots). > > > > > > Can you estimate from which size of the cluster on the JM heap usage > > > becomes critical (if we increased the threshold to 100k, or maybe 50k) > ? > > > > > > > > > On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qcx978132...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > Overall, I agree with increasing this value. but the default value > set > > to > > > > 100K maybe something too large from my side. > > > > > > > > I want to share some more information from my side. > > > > > > > > The small files problem is indeed a problem many users may encounter > in > > > > production env. The states(Keyed state and Operator state) can become > > > small > > > > files in DFS, but increase the value of > > > `state.backend.fs.memory-threshold` > > > > may encounter the JM OOM problem as Yun said previously. > > > > We've tried increase this value in our production env, but some > > > connectors > > > > which UnionState prevent us to do this, the memory consumed by these > > jobs > > > > can be very large (in our case, thousands of parallelism, set > > > > `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory > > for > > > > JM), so in the end, we use the solution proposed in FLINK-11937[1] > for > > > both > > > > keyed state and operator state. > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11937 > > > > Best, > > > > Congxian > > > > > > > > > > > > Yun Tang <myas...@live.com> 于2020年5月15日周五 下午9:09写道: > > > > > > > > > Please correct me if I am wrong, "put the increased value into the > > > > default > > > > > configuration" means > > > > > we will update that in default flink-conf.yaml but still leave the > > > > default > > > > > value of `state.backend.fs.memory-threshold`as previously? > > > > > It seems I did not get the point why existing setups with existing > > > > configs > > > > > will not be affected. > > > > > > > > > > The concern I raised is because one of our large-scale job with > 1024 > > > > > parallelism source of union state meet the JM OOM problem when we > > > > increase > > > > > this value. > > > > > I think if we introduce memory control when serializing TDD > > > > asynchronously > > > > > [1], we could be much more confident to increase this configuration > > as > > > > the > > > > > memory footprint > > > > > expands at that time by a lot of serialized TDDs. > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752 > > > > > > > > > > Best > > > > > Yun Tang > > > > > > > > > > ________________________________ > > > > > From: Stephan Ewen <se...@apache.org> > > > > > Sent: Friday, May 15, 2020 16:53 > > > > > To: dev <dev@flink.apache.org> > > > > > Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski < > > > > > pi...@ververica.com> > > > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" > > > from > > > > > 1K to 100K > > > > > > > > > > I see, thanks for all the input. > > > > > > > > > > I agree with Yun Tang that the use of UnionState is problematic and > > can > > > > > cause issues in conjunction with this. > > > > > However, most of the large-scale users I know that also struggle > with > > > > > UnionState have also increased this threshold, because with this > low > > > > > threshold, they get an excess number of small files and overwhelm > > their > > > > > HDFS / S3 / etc. > > > > > > > > > > An intermediate solution could be to put the increased value into > the > > > > > default configuration. That way, existing setups with existing > > configs > > > > will > > > > > not be affected, but new users / installations will have a simper > > time. > > > > > > > > > > Best, > > > > > Stephan > > > > > > > > > > > > > > > On Thu, May 14, 2020 at 9:20 PM Yun Tang <myas...@live.com> wrote: > > > > > > > > > > > Tend to be not in favor of this proposal as union state is > somewhat > > > > > abused > > > > > > in several popular source connectors (e.g. kafka), and increasing > > > this > > > > > > value could lead to JM OOM when sending tdd from JM to TMs with > > large > > > > > > parallelism. > > > > > > > > > > > > After we collect union state and initialize the map list [1], we > > > > already > > > > > > have union state ready to assign. At this time, the memory > > footprint > > > > has > > > > > > not increase too much as the union state which shared across > tasks > > > have > > > > > the > > > > > > same reference of ByteStreamStateHandle. However, when we send > tdd > > > with > > > > > the > > > > > > taskRestore to TMs, akka will serialize those > ByteStreamStateHandle > > > > > within > > > > > > tdd to increases the memory footprint. If the source have 1024 > > > > > > parallelisms, and any one of the sub-task would then have > > 1024*100KB > > > > size > > > > > > state handles. The sum of total memory footprint cannot be > ignored. > > > > > > > > > > > > If we plan to increase the default value of > > > > > > state.backend.fs.memory-threshold, we should first resolve the > > above > > > > > case. > > > > > > In other words, this proposal could be a trade-off, which benefit > > > > perhaps > > > > > > 99% users, but might bring harmful effects to 1% user with > > > large-scale > > > > > > flink jobs. > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87 > > > > > > > > > > > > Best > > > > > > Yun Tang > > > > > > > > > > > > > > > > > > ________________________________ > > > > > > From: Yu Li <car...@gmail.com> > > > > > > Sent: Thursday, May 14, 2020 23:51 > > > > > > To: Till Rohrmann <trohrm...@apache.org> > > > > > > Cc: dev <dev@flink.apache.org>; Piotr Nowojski < > > pi...@ververica.com> > > > > > > Subject: Re: [DISCUSS] increase > "state.backend.fs.memory-threshold" > > > > from > > > > > > 1K to 100K > > > > > > > > > > > > TL;DR: I have some reservations but tend to be +1 for the > proposal, > > > > > > meanwhile suggest we have a more thorough solution in the long > run. > > > > > > > > > > > > Please correct me if I'm wrong, but it seems the root cause of > the > > > > issue > > > > > is > > > > > > too many small files generated. > > > > > > > > > > > > I have some concerns for the case of session cluster [1], as well > > as > > > > > > possible issues for users at large scale, otherwise I think > > > increasing > > > > > > `state.backend.fs.memory-threshold` to 100K is a good choice, > based > > > on > > > > > the > > > > > > assumption that a large portion of our users are running small > jobs > > > > with > > > > > > small states. > > > > > > > > > > > > OTOH, maybe extending the solution [2] of resolving RocksDB small > > > file > > > > > > problem (as proposed by FLINK-11937 [3]) to also support operator > > > state > > > > > > could be an alternative? We have already applied the solution in > > > > > production > > > > > > for operator state and solved the HDFS NN RPC bottleneck problem > on > > > > last > > > > > > year's Singles' day. > > > > > > > > > > > > Best Regards, > > > > > > Yu > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h > > > > > > > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-11937 > > > > > > > > > > > > > > > > > > On Thu, 14 May 2020 at 21:45, Till Rohrmann < > trohrm...@apache.org> > > > > > wrote: > > > > > > > > > > > > > I cannot say much about the concrete value but if our users > have > > > > > problems > > > > > > > with the existing default values, then it makes sense to me to > > > change > > > > > it. > > > > > > > > > > > > > > One thing to check could be whether it is possible to provide a > > > > > > meaningful > > > > > > > exception in case that the state size exceeds the frame size. > At > > > the > > > > > > > moment, Flink should fail with a message saying that a rpc > > message > > > > > > exceeds > > > > > > > the maximum frame size. Maybe it is also possible to point the > > user > > > > > > towards > > > > > > > "state.backend.fs.memory-threshold" if the message exceeds the > > > frame > > > > > size > > > > > > > because of too much state. > > > > > > > > > > > > > > Cheers, > > > > > > > Till > > > > > > > > > > > > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org > > > > > > wrote: > > > > > > > > > > > > > >> The parameter "state.backend.fs.memory-threshold" decides > when a > > > > state > > > > > > >> will > > > > > > >> become a file and when it will be stored inline with the > > metadata > > > > (to > > > > > > >> avoid > > > > > > >> excessive amounts of small files). > > > > > > >> > > > > > > >> By default, this threshold is 1K - so every state above that > > size > > > > > > becomes > > > > > > >> a > > > > > > >> file. For many cases, this threshold seems to be too low. > > > > > > >> There is an interesting talk with background on this from > Scott > > > > > Kidder: > > > > > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0 > > > > > > >> > > > > > > >> I wanted to discuss increasing this to 100K by default. > > > > > > >> > > > > > > >> Advantage: > > > > > > >> - This should help many users out of the box, which > otherwise > > > see > > > > > > >> checkpointing problems on systems like S3, GCS, etc. > > > > > > >> > > > > > > >> Disadvantage: > > > > > > >> - For very large jobs, this increases the required heap > memory > > > on > > > > > the > > > > > > JM > > > > > > >> side, because more state needs to be kept in-line when > gathering > > > the > > > > > > acks > > > > > > >> for a pending checkpoint. > > > > > > >> - If tasks have a lot of states and each state is roughly at > > > this > > > > > > >> threshold, we increase the chance of exceeding the RPC frame > > size > > > > and > > > > > > >> failing the job. > > > > > > >> > > > > > > >> What do you think? > > > > > > >> > > > > > > >> Best, > > > > > > >> Stephan > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >