1) you are right, a thread's restoration phase will not interfere will any
other threads' normal processing collocated within the same JVM / machine
etc at all. So you may have a Streams instance which contains some threads
already finished restoring and started processing tasks, while other
threads contained are still restoring.


Guozhang

On Mon, Feb 25, 2019 at 1:53 PM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hi Guozhang -
>
> Thanks for the replies, and directing me to the existing JIRAs. I think
> that a two-phase rebalance will be quite useful.
>
> 1) For clarity's sake, I should have just asked: When a new thread / node
> is created and tasks are rebalanced, are the state stores on the new
> threads/nodes fully restored during rebalancing, thereby blocking *any and
> all *threads from proceeding with processing until restoration is complete?
> I do not believe that this is the case, and in the case of rebalanced tasks
> only the threads assigned the new tasks will be paused until state store
> restoration is complete.
>
>
> Thanks for your help - I appreciate you taking the time to reply.
>
> Adam
>
>
>
> On Wed, Feb 20, 2019 at 8:38 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Adam,
> >
> > Sorry for being late replying on this thread, I've put my comments
> inlined
> > below.
> >
> > On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare <adam.bellem...@gmail.com>
> > wrote:
> >
> > > Hey Folks
> > >
> > > I have a few questions around the operations of stateful processing
> while
> > > scaling nodes up/down, and a possible KIP in question #4. Most of them
> > have
> > > to do with task processing during rebuilding of state stores after
> > scaling
> > > nodes up.
> > >
> > > Scenario:
> > > Single node/thread, processing 2 topics (10 partitions each):
> > > User event topic (events) - ie: key:userId, value: ProductId
> > > Product topic (entity) - ie: key: ProductId, value: productData
> > >
> > > My topology looks like this:
> > >
> > > KTable productTable = ... //materialize from product topic
> > >
> > > KStream output = userStream
> > >     .map(x => (x.value, x.key) ) //Swap the key and value around
> > >     .join(productTable, ... ) //Joiner is not relevant here
> > >     .to(...)  //Send it to some output topic
> > >
> > >
> > > Here are my questions:
> > > 1) If I scale the processing node count up, partitions will be
> rebalanced
> > > to the new node. Does processing continue as normal on the original
> node,
> > > while the new node's processing is paused as the internal state stores
> > are
> > > rebuilt/reloaded? From my reading of the code (and own experience) I
> > > believe this to be the case, but I am just curious in case I missed
> > > something.
> > >
> > >
> > With 2 topics and 10 partitions each, assuming the default
> PartitionGrouper
> > is used, there should be a total of 20 tasks (10 tasks for map which will
> > send to an internal repartition topic, and 10 tasks for doing the join)
> > created since these two topics are co-partitioned for joins.
> >
> > For example, task-0 would be processing the join from
> > user-topic-partition-0 and product-topic-partition-0, and so on.
> >
> > With a single thread, all of these 20 tasks will be allocated to this
> > thread, which would process them in an iterative manner. Note that since
> > each task has its own state store (e.g. product-state-store-0 for task-0,
> > etc), it means this thread will host all the 10 sets of state stores as
> > well (note for the 10 mapping tasks there's no state stores at all).
> >
> > When you add new threads either within the same node, or on a different
> > node, after rebalance each thread should be processing 10 tasks, and
> hence
> > owning corresponding set of state stores due to rebalance. The new thread
> > will first restore the state stores it gets assigned before start
> > processing.
> >
> >
> > > 2) What happens to the userStream map task? Will the new node be able
> to
> > > process this task while the state store is rebuilding/reloading? My
> > reading
> > > of the code suggests that this map process will be paused on the new
> node
> > > while the state store is rebuilt. The effect of this is that it will
> lead
> > > to a delay in events reaching the original node's partitions, which
> will
> > be
> > > seen as late-arriving events. Am I right in this assessment?
> > >
> > >
> > Currently the thread will NOT start processing any tasks until ALL
> stateful
> > tasks completes restoring (stateless tasks, like the map tasks in your
> > example never needs restoration at all). There's an open JIRA for making
> it
> > customizable but I cannot find it currently.
> >
> >
> > > 3) How does scaling up work with standby state-store replicas? From my
> > > reading of the code, it appears that scaling a node up will result in a
> > > reabalance, with the state assigned to the new node being rebuilt first
> > > (leading to a pause in processing). Following this, the standy replicas
> > are
> > > populated. Am I correct in this reading?
> > >
> > > Standby tasks are running in parallel with active stream tasks, and it
> > simply reads from the changelog topic in read time and populate the
> standby
> > store replica; when scaling out, the instances with standby tasks will be
> > preferred over those who do not have any standby for the task, and hence
> > when restoring only a very small amount of data needs to be restored
> > (think: the standby replica of the store is already populated up to
> offset
> > 90 at the rebalance, while the active task is writing to the changelog
> > topic with log end offset 100, so you only need to restore 90 - 100
> instead
> > of 0 - 100).
> >
> >
> > > 4) If my reading in #3 is correct, would it be possible to pre-populate
> > the
> > > standby stores on scale-up before initiating active-task transfer? This
> > > would allow seamless scale-up and scale-down without requiring any
> pauses
> > > for rebuilding state. I am interested in kicking this off as a KIP if
> so,
> > > but would appreciate any JIRAs or related KIPs to read up on prior to
> > > digging into this.
> > >
> > > Yes, there is some discussions about this here:
> > https://issues.apache.org/jira/browse/KAFKA-6145
> >
> >
> > >
> > > Thanks
> > >
> > > Adam Bellemare
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to