Sophie Blee-Goldman created KAFKA-9467:
------------------------------------------
Summary: Multiple wallclock punctuators may be scheduled after a
rebalance
Key: KAFKA-9467
URL: https://issues.apache.org/jira/browse/KAFKA-9467
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Sophie Blee-Goldman
In the eager rebalancing protocol*, Streams will suspend all tasks at the
beginning of a rebalance and then resume those which have been reassigned to
the same StreamThread. Part of suspending and resuming a task involves closing
and reinitializing the topology, specifically calling Processor#close followed
by Processor#init. If a wallclock punctuator is scheduled as part of init, it
will be rescheduled again after every rebalance. Streams does not cancel
existing punctuators during suspension, and does not tell users they must
cancel punctuations themselves during Processor#close.
This can cause multiple punctuators to build up over time, which has the
apparent effect of increasing the net punctuation rate for wallclock
punctuators. (The same technically occurs with event-time punctuators, but the
punctuation times are anchored relative to a fixed point and only one will be
triggered at a time, so there is no increased punctuation rate).
There are several options at this point:
A) Clear/cancel any existing punctuators during task suspension
B) Push it to the user to cancel their punctuators in Processor#close, and
update the documentation and examples to clarify this.
C) Leave existing punctuators alone during suspension, and instead block new
ones from being scheduled on top during re-initialization.
One drawback of options A and B is that cancelling/rescheduling punctuators can
mean a punctuation is never triggered if rebalances are more frequent than the
punctuation interval. Even if they are still triggered, the effective
punctuation interval will actually decrease as each rebalance delays the
punctuation.
Of course, if the task _does_ get migrated to another thread/instance the
punctuation would be reset anyways with option C, since we do not currently
store/persist the punctuation information anywhere. The wallclock semantics are
somewhat loosely defined, but I think most users would not consider any of
these a proper fix on their own as it just pushes the issue in the other
direction.
Of course, if we were to anchor the wallclock punctuations to a fixed time then
this would not be a problem. At that point it seems reasonable to just leave it
up to the user to cancel the punctuation during Processor#close, similar to any
other kind of resource that must be cleaned up. Even if users forgot to do so
it wouldn't affect the actual behavior, just causes unused punctuators to build
up. See https://issues.apache.org/jira/browse/KAFKA-7699.
Given this, I think the options for a complete solution are:
1) Implement KAFKA-7699 and then do A or B
2) Persist the current punctuation schedule while migrating a task (presumably
in the Subscription userdata) and then do C
Choosing the best course of action here is probably blocked on a decision on
whether or not we want to anchor wallclock punctuations (KAFKA-7699). If we
can't get consensus on that, we could always
3) Introduce a third type of punctuation, then do both 1 and 2 (for the new
"anchored-wall-clock" type and the existing "wall-clock" type, respectively).
-*Another naive workaround for this issue is to turn on/upgrade to cooperative
rebalancing, which will not suspend and resume all active tasks during a
rebalance, and only suspend tasks that will be immediately closed and migrated
to another instance or StreamThread. Of course, this will still cause the
punctuation to be reset for tasks that _are_ actually closed/migrated, so
practically speaking it's identical to option C alone
--
This message was sent by Atlassian Jira
(v8.3.4#803005)