Sophie Blee-Goldman created KAFKA-9467:
------------------------------------------

             Summary: Multiple wallclock punctuators may be scheduled after a 
rebalance
                 Key: KAFKA-9467
                 URL: https://issues.apache.org/jira/browse/KAFKA-9467
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Sophie Blee-Goldman


In the eager rebalancing protocol*, Streams will suspend all tasks at the 
beginning of a rebalance and then resume those which have been reassigned to 
the same StreamThread. Part of suspending and resuming a task involves closing 
and reinitializing the topology, specifically calling Processor#close followed 
by Processor#init. If a wallclock punctuator is scheduled as part of init, it 
will be rescheduled again after every rebalance. Streams does not cancel 
existing punctuators during suspension, and does not tell users they must 
cancel punctuations themselves during Processor#close.

This can cause multiple punctuators to build up over time, which has the 
apparent effect of increasing the net punctuation rate for wallclock 
punctuators. (The same technically occurs with event-time punctuators, but the 
punctuation times are anchored relative to a fixed point and only one will be 
triggered at a time, so there is no increased punctuation rate).

There are several options at this point:

A) Clear/cancel any existing punctuators during task suspension

B) Push it to the user to cancel their punctuators in Processor#close, and 
update the documentation and examples to clarify this.

C) Leave existing punctuators alone during suspension, and instead block new 
ones from being scheduled on top during re-initialization.

One drawback of options A and B is that cancelling/rescheduling punctuators can 
mean a punctuation is never triggered if rebalances are more frequent than the 
punctuation interval. Even if they are still triggered, the effective 
punctuation interval will actually decrease as each rebalance delays the 
punctuation.

Of course, if the task _does_ get migrated to another thread/instance the 
punctuation would be reset anyways with option C, since we do not currently 
store/persist the punctuation information anywhere. The wallclock semantics are 
somewhat loosely defined, but I think most users would not consider any of 
these a proper fix on their own as it just pushes the issue in the other 
direction.

Of course, if we were to anchor the wallclock punctuations to a fixed time then 
this would not be a problem. At that point it seems reasonable to just leave it 
up to the user to cancel the punctuation during Processor#close, similar to any 
other kind of resource that must be cleaned up. Even if users forgot to do so 
it wouldn't affect the actual behavior, just causes unused punctuators to build 
up. See https://issues.apache.org/jira/browse/KAFKA-7699.

Given this, I think the options for a complete solution are:

1) Implement KAFKA-7699 and then do A or B

2) Persist the current punctuation schedule while migrating a task (presumably 
in the Subscription userdata) and then do C

Choosing the best course of action here is probably blocked on a decision on 
whether or not we want to anchor wallclock punctuations (KAFKA-7699). If we 
can't get consensus on that, we could always

3) Introduce a third type of punctuation, then do both 1 and 2 (for the new 
"anchored-wall-clock" type and the existing "wall-clock" type, respectively).

 

-*Another naive workaround for this issue is to turn on/upgrade to cooperative 
rebalancing, which will not suspend and resume all active tasks during a 
rebalance, and only suspend tasks that will be immediately closed and migrated 
to another instance or StreamThread. Of course, this will still cause the 
punctuation to be reset for tasks that _are_ actually closed/migrated, so 
practically speaking it's identical to option C alone



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to