Hey Max, thanks for the pointer to UnboundedSourceWrapper. I have created BEAM-6077 and will try to come up with the patch
On Fri, Nov 16, 2018 at 12:41 PM Maximilian Michels <[email protected]> wrote: > Hi Jozef, > > The main blocker for rescaling Beam pipelines on Flink was the use of > Key Group state. This splits each operator state additionally into N > partitions, such that N * P = MAX_PARALLELISM, where P is the > parallelism of the operator. > > This has largely been done. However, it is not complete. If you look at > the way the UnboundedSourceWrapper snapshots its state, you will see > that it does not support Key Groups. Thus, if you increase the > parallelism, one of the new parallel instances of the operator will > _not_ receive state and thus behave differently. > > I think we could migrate UnboundedSourceWrapper to KeyGroups and then > also leverage spread of the Kafka partitions. > > Thanks, > Max > > On 16.11.18 10:57, Jozef Vilcek wrote: > > Hi, > > > > I want to collect some feedback on rescaling streaming Beam pipeline on > > Flink runner. Flink seems to be able to re-scale jobs, which in Beam > > terms means changing the parallelism in Beam. However, one have to make > > sure that state can rescale as well to the predefined MAX parallelism. > > Max parallelism must be set for job on FlinkRunner. > > > > Flink supports fiddling with max parallelism on global, environment and > > operator level. Changes in operator level are not possible with beam. I > > found this JIRA which seems to be inconclusive if changes in operator > > parallelism make sense to adopt somehow in Beam > > https://issues.apache.org/jira/browse/BEAM-68 > > > > I did try to set max parallelism to environment via my local patch. My > > job did launch and not crash like before when I bumped parallelism += 1. > > But there was one drawback as far as I know. My test job reads from > > kafka and after launching job from savepoint point, one partition does > > not continue from offset in savepoint but according to what is defined > > by auto.offset.reset (my case 'latest') which is not great. > > > > My questions: > > > > 1. Should re-scale work for beam if runner does support it or there can > > be some incompatibilities in general depending on how particular runner > > works > > > > 2. Did anyone have a success with Flink and rescale? Honestly, not sure > > how well it behaves in native Flink. Never tried it > > > > 3. Why does kafka not redistribute stored partition offsets after > > chenging parallelism? > > > > 4. Is BEAM-68 still relevant? > > > > Many thanks, > > Jozef >
