Hi Yi,

thank you for raising these questions. Please find my answers inline below.

*a) how are states for the virtual tasks managed during split/merge?*
for this SEP, stateful job elasticity is future work. SEP-32 currently only
deals with stateless elasticity
The idea for state preserving elasticity is to have a requirement that only
jobs can guarantee a bijective mapping between state key and input key will
be supported.
This requirement is needed so that when input keys move from one virtual
task to another, it is easy to identify which state keys should be present
in the store of the virtual task for correct operation.
additionally, stateful elasticity is only supported for jobs that rely on
blob store for backup and restore.
Furthermore, for stateful jobs elasticity is increased or decreased only in
steps of 2.
With these restrictions in place, when a job starts with elasticity factor
2, the state blob for the original task is copied for both virtual tasks
during a split.
for a merge, when two virtual tasks merge into one (virtual/original) task,
the state blob of new task will need to be stitched from older blobs.
This will need to be done by leveraging the stateKey input key bijective
mapping which will help determing for each state key in new blob, the value
should come from which older blob
(older blob belonged to a virtual task that consumed an input key based on
the keyBucket of the virutal task)
That said the design for stateful needs more work and is planned for a
subsequent follow up SEP and this current SEP-32, focusses only on
stateless jobs

*b) what's perf impact when we have 2 virtual tasks on the same SSP in the
same container, while one virtual task is much faster than the other?*
SystemConsumer subscribes to the input system at a partition level.
Due to this even if one v. task is much faster than the other, since both
consume the same SSP, system consumer of a container will only fetch only
once the entire SSP buffer is empty.
This means even though one v. task is much faster, the perf will be
determined by the slower v. task.
however, this is not worse than the pre-elastic job perf and if num
containers is increased then the fast v.task can improve perf if the slow
and fast v.task are in different containers (different system consumers)

*c) what's the reason that a virtual task can not filter older messages
from a previous offset, in case the container restarts from a smaller
offset from another virtual task consuming the same SSP?*
iiuc this question is for when a containers has two v. tasks that committed
checkpoints for an SSP where one fast v.task commited a newer offset and
slow v.task committed an older offset.
In this scenario, the SEP says there could be duplicate processing as the
SystemConsumer will start consuming from the older offset for the SSP.
Yes, an improvement can be done to enable the v.task that committed a newer
offset to start processing only from the offset after its checkpoint and
filter out older messages.

*d) how do we compare this w/ an alternative idea that implements a
KeyedOrderedExecutor w/ multiple parallel threads within the single task's
main event loop to increase the parallelism?*
Is this similar to the per-key parallelism option (in the rejected
solutions section) with the difference that the num threads is fixed for a
single task (as opposed to one thread per key in the rejected solution)?
this KeyOrdereredExecutor is better than the parallelism current
task.max.concurrency offers as it gives in-order execution per key.
However, for KeyOrderedExecutor solution num containers will still be <=
num tasks.
this means (a) to increase throughput for a key, all other keys should also
be processed faster (this is partially present in elasticity as seen in
question above, but with increased elasticity factor and more containers
this can be combated), (b) network, disk, i/o contention will be larger
than elasticity as virtual tasks can be spread across hosts whereas
increased throughput due to all keys (single task) in key ordered executor
sitting in the same host will increase the load on the host and (c) if one
or more of the parallel units (threads here) needs more resources, it will
result in large container which makes scheduling harder as finding large
chunks takes longer in a cluster whereas with virtual tasks, we can have
smaller containers for virtual tasks.


Please let me know if the above answers make sense and if there are any
follow-ups for this SEP.

On Thu, Jan 19, 2023 at 10:33 PM Yi Pan <nickpa...@gmail.com> wrote:

> Hey, Manasa,
>
> Sorry to chime in late. A few questions:
> a) how are states for the virtual tasks managed during split/merge?
> b) what's perf impact when we have 2 virtual tasks on the same SSP in the
> same container, while one virtual task is much faster than the other?
> c) what's the reason that a virtual task can not filter older messages from
> a previous offset, in case the container restarts from a smaller offset
> from another virtual task consuming the same SSP?
> d) how do we compare this w/ an alternative idea that implements a
> KeyedOrderedExecutor w/ multiple parallel threads within the single task's
> main event loop to increase the parallelism?
>
> Best,
>
> -Yi
>
>
> On Thu, Jan 19, 2023 at 3:26 PM Lakshmi Manasa <lakshmimanas...@gmail.com>
> wrote:
>
> > hi all,
> >
> >  if there are no concerns or questions about this SEP, I shall start the
> > vote email thread tomorrow.
> >
> > thanks,
> > Manasa
> >
> > On Fri, Jan 6, 2023 at 8:08 AM Lakshmi Manasa <lakshmimanas...@gmail.com
> >
> > wrote:
> >
> > > Hi all,
> > >   We created SEP-32: Elasticity for Samza.
> > >
> > > Please find SEP here (
> > >
> >
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-32%3A+Elasticity+for+Samza
> > > )
> > >   Please take a look and provide feedback. thanks, Manasa
> > >
> >
>

Reply via email to