Hey Max, thanks for the pointer to UnboundedSourceWrapper.
I have created BEAM-6077 and will try to come up with the patch

On Fri, Nov 16, 2018 at 12:41 PM Maximilian Michels <[email protected]> wrote:

> Hi Jozef,
>
> The main blocker for rescaling Beam pipelines on Flink was the use of
> Key Group state. This splits each operator state additionally into N
> partitions, such that N * P = MAX_PARALLELISM, where P is the
> parallelism of the operator.
>
> This has largely been done. However, it is not complete. If you look at
> the way the UnboundedSourceWrapper snapshots its state, you will see
> that it does not support Key Groups. Thus, if you increase the
> parallelism, one of the new parallel instances of the operator will
> _not_ receive state and thus behave differently.
>
> I think we could migrate UnboundedSourceWrapper to KeyGroups and then
> also leverage spread of the Kafka partitions.
>
> Thanks,
> Max
>
> On 16.11.18 10:57, Jozef Vilcek wrote:
> > Hi,
> >
> > I want to collect some feedback on rescaling streaming Beam pipeline on
> > Flink runner. Flink seems to be able to re-scale jobs, which in Beam
> > terms means changing the parallelism in Beam. However, one have to make
> > sure that state can rescale as well to the predefined MAX parallelism.
> > Max parallelism must be set for job on FlinkRunner.
> >
> > Flink supports fiddling with max parallelism on global, environment and
> > operator level. Changes in operator level are not possible with beam. I
> > found this JIRA which seems to be inconclusive if changes in operator
> > parallelism make sense to adopt somehow in Beam
> > https://issues.apache.org/jira/browse/BEAM-68
> >
> > I did try to set max parallelism to environment via my local patch. My
> > job did launch and not crash like before when I bumped parallelism += 1.
> > But there was one drawback as far as I know. My test job reads from
> > kafka and after launching job from savepoint point, one partition does
> > not continue from offset in savepoint but according to what is defined
> > by auto.offset.reset (my case 'latest') which is not great.
> >
> > My questions:
> >
> > 1. Should re-scale work for beam if runner does support it or there can
> > be some incompatibilities in general depending on how particular runner
> > works
> >
> > 2. Did anyone have a success with Flink and rescale? Honestly, not sure
> > how well it behaves in native Flink. Never tried it
> >
> > 3. Why does kafka not redistribute stored partition offsets after
> > chenging parallelism?
> >
> > 4. Is BEAM-68 still relevant?
> >
> > Many thanks,
> > Jozef
>

Reply via email to