This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f8dc1f  KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance 
convergence (#8475)
0f8dc1f is described below

commit 0f8dc1fcd720fbb7eb6152ef40ecaafdd6b04e15
Author: John Roesler <[email protected]>
AuthorDate: Fri Apr 17 16:03:39 2020 -0500

    KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance convergence 
(#8475)
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
 .../java/org/apache/kafka/common/utils/Utils.java  |  53 +++
 .../internals/assignment/AssignmentTestUtils.java  |  20 +-
 .../assignment/TaskAssignorConvergenceTest.java    | 440 +++++++++++++++++++++
 3 files changed, 494 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 682ccf5..ee627c9 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.util.EnumSet;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.kafka.common.KafkaException;
@@ -60,9 +61,13 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -1093,4 +1098,52 @@ public final class Utils {
             )
         );
     }
+
+    /**
+     * A Collector that offers two kinds of convenience:
+     * 1. You can specify the concrete type of the returned Map
+     * 2. You can turn a stream of Entries directly into a Map without having 
to mess with a key function
+     *    and a value function. In particular, this is handy if all you need 
to do is apply a filter to a Map's entries.
+     *
+     *
+     * One thing to be wary of: These types are too "distant" for IDE type 
checkers to warn you if you
+     * try to do something like build a TreeMap of non-Comparable elements. 
You'd get a runtime exception for that.
+     *
+     * @param mapSupplier The constructor for your concrete map type.
+     * @param <K> The Map key type
+     * @param <V> The Map value type
+     * @param <M> The type of the Map itself.
+     * @return new Collector<Map.Entry<K, V>, M, M>
+     */
+    public static <K, V, M extends Map<K, V>> Collector<Map.Entry<K, V>, M, M> 
entriesToMap(final Supplier<M> mapSupplier) {
+        return new Collector<Map.Entry<K, V>, M, M>() {
+            @Override
+            public Supplier<M> supplier() {
+                return mapSupplier;
+            }
+
+            @Override
+            public BiConsumer<M, Map.Entry<K, V>> accumulator() {
+                return (map, entry) -> map.put(entry.getKey(), 
entry.getValue());
+            }
+
+            @Override
+            public BinaryOperator<M> combiner() {
+                return (map, map2) -> {
+                    map.putAll(map2);
+                    return map;
+                };
+            }
+
+            @Override
+            public Function<M, M> finisher() {
+                return map -> map;
+            }
+
+            @Override
+            public Set<Characteristics> characteristics() {
+                return EnumSet.of(Characteristics.UNORDERED, 
Characteristics.IDENTITY_FINISH);
+            }
+        };
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index e5c5348..085af0e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -63,24 +63,6 @@ public class AssignmentTestUtils {
      * @return the UUID created by repeating the digit n in the UUID format
      */
     static UUID uuidForInt(final Integer n) {
-        if (n < 1 || n > 7) {
-            throw new IllegalArgumentException("Must pass in a single digit 
number to the uuid builder, got n = " + n);
-        }
-        final StringBuilder builder = new StringBuilder(36);
-        for (int i = 0; i < 8; ++i) {
-            builder.append(n);
-        }
-        builder.append('-');
-
-        for (int i = 0; i < 3; ++i) {
-            for (int j = 0; j < 4; ++j) {
-                builder.append(n);
-            }
-            builder.append('-');
-        }
-        for (int i = 0; i < 12; ++i) {
-            builder.append(n);
-        }
-        return UUID.fromString(builder.toString());
+        return new UUID(0, n);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
new file mode 100644
index 0000000..c47fc37
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.hamcrest.MatcherAssert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.entriesToMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+public class TaskAssignorConvergenceTest {
+    private static final class Harness {
+        private final Set<TaskId> statelessTasks;
+        private final Map<TaskId, Long> statefulTaskEndOffsetSums;
+        private final Map<UUID, ClientState> clientStates;
+        private final Map<UUID, ClientState> droppedClientStates;
+        private final StringBuilder history = new StringBuilder();
+
+        private static Harness initializeCluster(final int numStatelessTasks,
+                                                 final int numStatefulTasks,
+                                                 final int numNodes) {
+            int subtopology = 0;
+            final Set<TaskId> statelessTasks = new TreeSet<>();
+            {
+                int partition = 0;
+                for (int i = 0; i < numStatelessTasks; i++) {
+                    statelessTasks.add(new TaskId(subtopology, partition));
+                    if (partition == 4) {
+                        subtopology++;
+                        partition = 0;
+                    } else {
+                        partition++;
+                    }
+                }
+            }
+
+            final Map<TaskId, Long> statefulTaskEndOffsetSums = new 
TreeMap<>();
+            {
+                subtopology++;
+                int partition = 0;
+                for (int i = 0; i < numStatefulTasks; i++) {
+                    statefulTaskEndOffsetSums.put(new TaskId(subtopology, 
partition), 150000L);
+
+                    if (partition == 4) {
+                        subtopology++;
+                        partition = 0;
+                    } else {
+                        partition++;
+                    }
+                }
+            }
+
+            final Map<UUID, ClientState> clientStates = new TreeMap<>();
+            for (int i = 0; i < numNodes; i++) {
+                final UUID uuid = uuidForInt(i);
+                clientStates.put(uuid, emptyInstance(uuid, 
statefulTaskEndOffsetSums));
+            }
+
+            return new Harness(statelessTasks, statefulTaskEndOffsetSums, 
clientStates);
+        }
+
+        private Harness(final Set<TaskId> statelessTasks,
+                        final Map<TaskId, Long> statefulTaskEndOffsetSums,
+                        final Map<UUID, ClientState> clientStates) {
+            this.statelessTasks = statelessTasks;
+            this.statefulTaskEndOffsetSums = statefulTaskEndOffsetSums;
+            this.clientStates = clientStates;
+            droppedClientStates = new TreeMap<>();
+            history.append('\n');
+            history.append("Cluster and application initial state: \n");
+            history.append("Stateless tasks: 
").append(statelessTasks).append('\n');
+            history.append("Stateful tasks:  
").append(statefulTaskEndOffsetSums.keySet()).append('\n');
+            formatClientStates(true);
+            history.append("History of the cluster: \n");
+        }
+
+        private void addNode() {
+            final UUID uuid = uuidForInt(clientStates.size() + 
droppedClientStates.size());
+            history.append("Adding new node ").append(uuid).append('\n');
+            clientStates.put(uuid, emptyInstance(uuid, 
statefulTaskEndOffsetSums));
+        }
+
+        private static ClientState emptyInstance(final UUID uuid, final 
Map<TaskId, Long> allTaskEndOffsetSums) {
+            final ClientState clientState = new ClientState(1);
+            clientState.computeTaskLags(uuid, allTaskEndOffsetSums);
+            return clientState;
+        }
+
+        private void addOrResurrectNodesRandomly(final Random prng, final int 
limit) {
+            final int numberToAdd = prng.nextInt(limit);
+            for (int i = 0; i < numberToAdd; i++) {
+                final boolean addNew = prng.nextBoolean();
+                if (addNew || droppedClientStates.isEmpty()) {
+                    addNode();
+                } else {
+                    final UUID uuid = selectRandomElement(prng, 
droppedClientStates);
+                    history.append("Resurrecting node 
").append(uuid).append('\n');
+                    clientStates.put(uuid, droppedClientStates.get(uuid));
+                    droppedClientStates.remove(uuid);
+                }
+            }
+        }
+
+        private void dropNode() {
+            if (clientStates.isEmpty()) {
+                throw new NoSuchElementException("There are no nodes to drop");
+            } else {
+                final UUID toDrop = clientStates.keySet().iterator().next();
+                dropNode(toDrop);
+            }
+        }
+
+        private void dropRandomNodes(final int numNode, final Random prng) {
+            int dropped = 0;
+            while (!clientStates.isEmpty() && dropped < numNode) {
+                final UUID toDrop = selectRandomElement(prng, clientStates);
+                dropNode(toDrop);
+                dropped++;
+            }
+            history.append("Stateless tasks: 
").append(statelessTasks).append('\n');
+            history.append("Stateful tasks:  
").append(statefulTaskEndOffsetSums.keySet()).append('\n');
+            formatClientStates(true);
+        }
+
+        private void dropNode(final UUID toDrop) {
+            final ClientState clientState = clientStates.remove(toDrop);
+            history.append("Dropping node ").append(toDrop).append(": 
").append(clientState).append('\n');
+            droppedClientStates.put(toDrop, clientState);
+        }
+
+        private static UUID selectRandomElement(final Random prng, final 
Map<UUID, ClientState> clients) {
+            int dropIndex = prng.nextInt(clients.size());
+            UUID toDrop = null;
+            for (final UUID uuid : clients.keySet()) {
+                if (dropIndex == 0) {
+                    toDrop = uuid;
+                    break;
+                } else {
+                    dropIndex--;
+                }
+            }
+            return toDrop;
+        }
+
+        /**
+         * Flip the cluster states from "assigned" to "subscribed" so they can 
be used for another round of assignments.
+         */
+        private void prepareForNextRebalance() {
+            final Map<UUID, ClientState> newClientStates = new TreeMap<>();
+            for (final Map.Entry<UUID, ClientState> entry : 
clientStates.entrySet()) {
+                final UUID uuid = entry.getKey();
+                final ClientState newClientState = new ClientState(1);
+                final ClientState clientState = entry.getValue();
+                final Map<TaskId, Long> taskOffsetSums = new TreeMap<>();
+                for (final TaskId taskId : clientState.activeTasks()) {
+                    if (statefulTaskEndOffsetSums.containsKey(taskId)) {
+                        taskOffsetSums.put(taskId, 
statefulTaskEndOffsetSums.get(taskId));
+                    }
+                }
+                for (final TaskId taskId : clientState.standbyTasks()) {
+                    if (statefulTaskEndOffsetSums.containsKey(taskId)) {
+                        taskOffsetSums.put(taskId, 
statefulTaskEndOffsetSums.get(taskId));
+                    }
+                }
+                
newClientState.addPreviousActiveTasks(clientState.activeTasks());
+                
newClientState.addPreviousStandbyTasks(clientState.standbyTasks());
+                newClientState.addPreviousTasksAndOffsetSums(taskOffsetSums);
+                newClientState.computeTaskLags(uuid, 
statefulTaskEndOffsetSums);
+                newClientStates.put(uuid, newClientState);
+            }
+
+            clientStates.clear();
+            clientStates.putAll(newClientStates);
+        }
+
+        private void recordBefore(final int iteration) {
+            history.append("Starting Iteration: 
").append(iteration).append('\n');
+            formatClientStates(false);
+        }
+
+        private void recordAfter(final int iteration, final boolean 
rebalancePending) {
+            history.append("After assignment:  
").append(iteration).append('\n');
+            history.append("Rebalance pending: 
").append(rebalancePending).append('\n');
+            formatClientStates(true);
+            history.append('\n');
+        }
+
+        private void formatClientStates(final boolean printUnassigned) {
+            final Set<TaskId> unassignedTasks = new TreeSet<>();
+            unassignedTasks.addAll(statefulTaskEndOffsetSums.keySet());
+            unassignedTasks.addAll(statelessTasks);
+            history.append('{').append('\n');
+            for (final Map.Entry<UUID, ClientState> entry : 
clientStates.entrySet()) {
+                history.append("  ").append(entry.getKey()).append(": 
").append(entry.getValue()).append('\n');
+                unassignedTasks.removeAll(entry.getValue().assignedTasks());
+            }
+            history.append('}').append('\n');
+            if (printUnassigned) {
+                history.append("Unassigned Tasks: 
").append(unassignedTasks).append('\n');
+            }
+        }
+
+    }
+
+    @Test
+    public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
+        final AssignmentConfigs configs = new AssignmentConfigs(100L,
+                                                                1,
+                                                                2,
+                                                                0,
+                                                                1000L);
+
+        final Harness harness = Harness.initializeCluster(1, 1, 1);
+
+        testForConvergence(harness, configs, 1);
+        verifyValidAssignment(0, harness);
+    }
+
+    @Test
+    public void assignmentShouldConvergeAfterAddingNode() {
+        final int numStatelessTasks = 15;
+        final int numStatefulTasks = 13;
+        final int maxWarmupReplicas = 2;
+        final int numStandbyReplicas = 0;
+
+        final AssignmentConfigs configs = new AssignmentConfigs(100L,
+                                                                1,
+                                                                
maxWarmupReplicas,
+                                                                
numStandbyReplicas,
+                                                                1000L);
+
+        final Harness harness = Harness.initializeCluster(numStatelessTasks, 
numStatefulTasks, 1);
+        testForConvergence(harness, configs, 1);
+        harness.addNode();
+        // we expect convergence to involve moving each task at most once, and 
we can move "maxWarmupReplicas" number
+        // of tasks at once, hence the iteration limit
+        testForConvergence(harness, configs, numStatefulTasks / 
maxWarmupReplicas + 1);
+        verifyValidAssignment(numStandbyReplicas, harness);
+    }
+
+    @Ignore // Adding this failing test before adding the code that fixes it
+    @Test
+    public void droppingNodesShouldConverge() {
+        final int numStatelessTasks = 15;
+        final int numStatefulTasks = 13;
+        final int maxWarmupReplicas = 2;
+        final int numStandbyReplicas = 0;
+
+        final AssignmentConfigs configs = new AssignmentConfigs(100L,
+                                                                1,
+                                                                
maxWarmupReplicas,
+                                                                
numStandbyReplicas,
+                                                                1000L);
+
+        final Harness harness = Harness.initializeCluster(numStatelessTasks, 
numStatefulTasks, 7);
+        testForConvergence(harness, configs, 1);
+        harness.dropNode();
+        // This time, we allow one extra iteration because the
+        // first stateful task needs to get shuffled back to the first node
+        testForConvergence(harness, configs, numStatefulTasks / 
maxWarmupReplicas + 2);
+
+        verifyValidAssignment(numStandbyReplicas, harness);
+    }
+
+    @Ignore // Adding this failing test before adding the code that fixes it
+    @Test
+    public void randomClusterPerturbationsShouldConverge() {
+        // do as many tests as we can in 10 seconds
+        final long deadline = System.currentTimeMillis() + 10_000L;
+        do {
+            final long seed = new Random().nextLong();
+            runRandomizedScenario(seed);
+        } while (System.currentTimeMillis() < deadline);
+    }
+
+    private static void runRandomizedScenario(final long seed) {
+        Harness harness = null;
+        try {
+            final Random prng = new Random(seed);
+
+            // These are all rand(limit)+1 because we need them to be at least 
1 and the upper bound is exclusive
+            final int initialClusterSize = prng.nextInt(10) + 1;
+            final int numStatelessTasks = prng.nextInt(10) + 1;
+            final int numStatefulTasks = prng.nextInt(10) + 1;
+            final int maxWarmupReplicas = prng.nextInt(numStatefulTasks) + 1;
+            // This one is rand(limit+1) because we _want_ to test zero and 
the upper bound is exclusive
+            final int numStandbyReplicas = prng.nextInt(initialClusterSize + 
1);
+
+            final int numberOfEvents = prng.nextInt(10) + 1;
+
+            final AssignmentConfigs configs = new AssignmentConfigs(100L,
+                                                                    1,
+                                                                    
maxWarmupReplicas,
+                                                                    
numStandbyReplicas,
+                                                                    1000L);
+
+            harness = Harness.initializeCluster(numStatelessTasks, 
numStatefulTasks, initialClusterSize);
+            testForConvergence(harness, configs, 1);
+            verifyValidAssignment(numStandbyReplicas, harness);
+
+            for (int i = 0; i < numberOfEvents; i++) {
+                final int event = prng.nextInt(2);
+                switch (event) {
+                    case 0:
+                        
harness.dropRandomNodes(prng.nextInt(initialClusterSize), prng);
+                        break;
+                    case 1:
+                        harness.addOrResurrectNodesRandomly(prng, 
initialClusterSize);
+                        break;
+                    default:
+                        throw new IllegalStateException("Unexpected event: " + 
event);
+                }
+                if (!harness.clientStates.isEmpty()) {
+                    testForConvergence(harness, configs, numStatefulTasks * 2);
+                    verifyValidAssignment(numStandbyReplicas, harness);
+                }
+            }
+        } catch (final AssertionError t) {
+            throw new AssertionError(
+                "Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(" + seed + ")`.",
+                t
+            );
+        } catch (final Throwable t) {
+            final StringBuilder builder =
+                new StringBuilder()
+                    .append("Exception in randomized scenario. Reproduce with: 
`runRandomizedScenario(")
+                    .append(seed)
+                    .append(")`. ");
+            if (harness != null) {
+                builder.append(harness.history);
+            }
+            throw new AssertionError(builder.toString(), t);
+        }
+    }
+
+    private static void verifyValidAssignment(final int numStandbyReplicas, 
final Harness harness) {
+        final Map<TaskId, Set<UUID>> assignments = new TreeMap<>();
+        for (final TaskId taskId : harness.statefulTaskEndOffsetSums.keySet()) 
{
+            assignments.put(taskId, new TreeSet<>());
+        }
+        for (final TaskId taskId : harness.statelessTasks) {
+            assignments.put(taskId, new TreeSet<>());
+        }
+        for (final Map.Entry<UUID, ClientState> entry : 
harness.clientStates.entrySet()) {
+            for (final TaskId activeTask : entry.getValue().activeTasks()) {
+                if (assignments.containsKey(activeTask)) {
+                    assignments.get(activeTask).add(entry.getKey());
+                }
+            }
+            for (final TaskId standbyTask : entry.getValue().standbyTasks()) {
+                assignments.get(standbyTask).add(entry.getKey());
+            }
+        }
+        final TreeMap<TaskId, Set<UUID>> misassigned =
+            assignments
+                .entrySet()
+                .stream()
+                .filter(entry -> {
+                    final int expectedActives = 1;
+                    final boolean isStateless = 
harness.statelessTasks.contains(entry.getKey());
+                    final int expectedStandbys = isStateless ? 0 : 
numStandbyReplicas;
+                    // We'll never assign even the expected number of standbys 
if they don't actually fit in the cluster
+                    final int expectedAssignments = Math.min(
+                        harness.clientStates.size(),
+                        expectedActives + expectedStandbys
+                    );
+                    return entry.getValue().size() != expectedAssignments;
+                })
+                .collect(entriesToMap(TreeMap::new));
+
+        MatcherAssert.assertThat(
+            new StringBuilder().append("Found some over- or under-assigned 
tasks in the final assignment with ")
+                               .append(numStandbyReplicas)
+                               .append(" standby replicas.")
+                               .append(harness.history)
+                               .toString(),
+            misassigned,
+            is(emptyMap()));
+    }
+
+    private static void testForConvergence(final Harness harness,
+                                           final AssignmentConfigs configs,
+                                           final int iterationLimit) {
+        final Set<TaskId> allTasks = new TreeSet<>();
+        allTasks.addAll(harness.statelessTasks);
+        allTasks.addAll(harness.statefulTaskEndOffsetSums.keySet());
+
+        boolean rebalancePending = true;
+        int iteration = 0;
+        while (rebalancePending && iteration < iterationLimit) {
+            iteration++;
+            harness.prepareForNextRebalance();
+            harness.recordBefore(iteration);
+            rebalancePending = new HighAvailabilityTaskAssignor(
+                harness.clientStates, allTasks,
+                harness.statefulTaskEndOffsetSums.keySet(),
+                configs
+            ).assign();
+            harness.recordAfter(iteration, rebalancePending);
+        }
+
+        if (rebalancePending) {
+            final StringBuilder message =
+                new StringBuilder().append("Rebalances have not converged 
after iteration cutoff: ")
+                                   .append(iterationLimit)
+                                   .append(harness.history);
+            fail(message.toString());
+        }
+    }
+
+
+}

Reply via email to