This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7ec10ce19a HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585) 7ec10ce19a is described below commit 7ec10ce19a1c5869a4713f9c9b6b12da5c63e42b Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Fri Sep 2 03:14:34 2022 -0700 HOTFIX: fix PriorityQueue iteration to assign warmups in priority order (#12585) Based on a patch submitted to the confluentinc fork & then abandoned. Needed some updates and minor expansion but more or less just re-applied the changes proposed in confluentinc#697. Original PR has a very detailed justification for these changes but the tl;dr of it is that apparently the PriorityQueue's iterator does not actually guarantee to return elements in priority order. Reviewer: Luke Chen <show...@gmail.com> --- .../streams/processor/internals/assignment/TaskMovement.java | 6 ++++-- .../processor/internals/assignment/TaskMovementTest.java | 11 +++-------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java index 38e64276ba..ec0fa5e11e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java @@ -107,7 +107,8 @@ final class TaskMovement { final int movementsNeeded = taskMovements.size(); - for (final TaskMovement movement : taskMovements) { + while (!taskMovements.isEmpty()) { + final TaskMovement movement = taskMovements.poll(); // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most // caught up client. final boolean moved = tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, caughtUpClientsByTaskLoad, movement) || @@ -157,7 +158,8 @@ final class TaskMovement { int movementsNeeded = 0; - for (final TaskMovement movement : taskMovements) { + while (!taskMovements.isEmpty()) { + final TaskMovement movement = taskMovements.poll(); final Function<UUID, Boolean> eligibleClientPredicate = clientId -> !clientStates.get(clientId).hasAssignedTask(movement.task); UUID sourceClient = caughtUpClientsByTaskLoad.poll( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java index baf6d18496..a337deb801 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java @@ -244,15 +244,10 @@ public class TaskMovementTest { assertThat(client2, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_2))); assertThat(client3, hasProperty("activeTasks", ClientState::activeTasks, mkSet(TASK_0_1))); - // we should only assign one warmup, but it could be either one that needs to be migrated. + // we should only assign one warmup, and the task movement should have the highest priority assertThat(client1, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet())); - try { - assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_1))); - assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet())); - } catch (final AssertionError ignored) { - assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet())); - assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_2))); - } + assertThat(client2, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet(TASK_0_1))); + assertThat(client3, hasProperty("standbyTasks", ClientState::standbyTasks, mkSet())); } @Test