John Roesler created KAFKA-10121:
------------------------------------

             Summary: Streams Task Assignment optimization design
                 Key: KAFKA-10121
                 URL: https://issues.apache.org/jira/browse/KAFKA-10121
             Project: Kafka
          Issue Type: Task
          Components: streams
    Affects Versions: 2.6.0
            Reporter: John Roesler


Beginning in Kafka 2.6.0, Streams has a new task assignment algorithm that 
reacts to cluster membership changes by starting out 100% sticky and warming up 
tasks in the background to eventually migrate to a 100% balanced assignment. 
See KIP-441 for the details.

However, in computing the final, 100% balanced, assignment, the assignor 
doesn't take into account the current ownership of the tasks. Thus, when 
instances are added or removed, the assignor is likely to migrate large numbers 
of tasks. This is mitigated by the fact that the migrations happen at a trickle 
over time in the background, but it's still better to avoid unnecessary 
migrations if possible. See the example below for details.

The solution seems to be to use some kind of optimization algorithm to find a 
100% balanced assignment that also has maximum overlap with the current 
assignment.

 

Example, with additional detail:

The main focus of the KIP-441 work was the migration mechanism that allows 
Streams to warm up state for new instances in the background while continuing 
to process tasks on the instances that previously owned them. Accordingly the 
assignment algorithm itself focuses on simplicity and guaranteed balance, not 
optimality.

There are three kinds of balance that all have to be met for Stream to be 100% 
balanced:
 # Active task balance: no member should have more active processing workload 
than any other
 # Stateful task balance: no member should have more stateful tasks (either 
active and stateful or standby) than any other
 # Task parallel balance: no member should have more tasks (partitions) for a 
single subtopology than another

(Note: in all these cases, an instance may actually have one more task than 
another, if the number of members doesn't evenly divide the number of tasks. 
For a simple case, consider if you have two members and only one task. It can 
only be assigned to one of the members, and the assignment is still as balanced 
as it could be.)

The current algorithm ensures all three kinds of balance thusly:
 # sort all members by name (to ensure assignment stability)
 # sort all tasks by subtopology first, then by partition. E.g., sorted like 
this: 0_0, 0_1, 0_2, 1_0, 1_1
 # for all tasks that are stateful, iterate over both tasks and members in 
sorted order, assigning each task t[i] to the member m[i % num_tasks]
 # for each standby replica we need to assign, continue looping over the sorted 
members, assigning each replica to the next member (assuming the member doesn't 
already have a replica of the task)
 # for each stateless task, assign an active replica to the member with the 
least number of tasks. Since the active assignment of the member with the least 
number of tasks should have at most 1 task less than any other member after 
step 3, the assignment after step 5 is still balanced.

To demonstrate how a more sophisticated algorithm could minimize migrations, 
consider the following simple assignment with two instances and six tasks:

m1: [0_0, 0_2, 0_4]

m2: [0_1, 0_3, 0_5]

Adding a new member causes four of the tasks to migrate:

m1: [0_0, 0_3]

m2: [0_1, 0_4]

m3: [0_2, 0_5]

However, the following assignment is equally balanced, and only two of the 
tasks need to migrate:

m1: [0_0, 0_2]

m2: [0_1, 0_3]

m3: [0_4, 0_5]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to