This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new cec5ddc  KAFKA-10413: Allow for even distribution of lost/new tasks 
when multiple Connect workers join at the same time (#9319)
cec5ddc is described below

commit cec5ddcd4eb2bc13207ff28b2830af12550b0008
Author: Ramesh Krishnan M <[email protected]>
AuthorDate: Wed Feb 3 01:34:06 2021 +0530

    KAFKA-10413: Allow for even distribution of lost/new tasks when multiple 
Connect workers join at the same time (#9319)
    
    First issue: When more than one workers join the Connect group the 
incremental cooperative assignor revokes and reassigns at most average number 
of tasks per worker.
    Side-effect: This results in the additional workers joining the group stay 
idle and would require more future rebalances to happen to have even 
distribution of tasks.
    Fix: As part of task assignment calculation following a deployment, the 
reassignment of tasks are calculated by revoking all the tasks above the 
rounded up (ceil) average number of tasks.
    
    Second issue: When more than one worker is lost and rejoins the group at 
most one worker will be re assigned with the lost tasks from all the workers 
that left the group.
    Side-effect: In scenarios where more than one worker is lost and rejoins 
the group only one among them gets assigned all the partitions that were lost 
in the past. The additional workers that have joined would not get any task 
assigned to them until a rebalance that happens in future.
    Fix: As part fo lost task re assignment all the new workers that have 
joined the group would be considered for task assignment and would be assigned 
in a round robin fashion with the new tasks.
    
    Testing strategy :
    * System testing in a Kubernetes environment completed
    * New integration tests to test for balanced tasks
    * Updated unit tests.
    
    Co-authored-by: Rameshkrishnan Muthusamy 
<[email protected]>
    Co-authored-by: Randall Hauch <[email protected]>
    Co-authored-by: Konstantine Karantasis <[email protected]>
    
    Reviewers: Randall Hauch <[email protected]>, Konstantine Karantasis 
<[email protected]>
---
 .../IncrementalCooperativeAssignor.java            | 54 ++++++++++------
 .../RebalanceSourceConnectorsIntegrationTest.java  | 71 ++++++++++++++++++++--
 .../IncrementalCooperativeAssignorTest.java        | 24 +++++---
 .../WorkerCoordinatorIncrementalTest.java          | 19 +++---
 4 files changed, 127 insertions(+), 41 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index 744a099..c3f2f4b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -34,7 +34,6 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.function.Function;
@@ -445,16 +444,34 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
-            Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+            List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
             if (!candidateWorkersForReassignment.isEmpty()) {
                 candidateWorkerLoad = 
pickCandidateWorkerForReassignment(completeWorkerAssignment);
             }
 
-            if (candidateWorkerLoad.isPresent()) {
-                WorkerLoad workerLoad = candidateWorkerLoad.get();
-                log.debug("A candidate worker has been found to assign lost 
tasks: {}", workerLoad.worker());
-                lostAssignments.connectors().forEach(workerLoad::assign);
-                lostAssignments.tasks().forEach(workerLoad::assign);
+            if (!candidateWorkerLoad.isEmpty()) {
+                log.debug("Assigning lost tasks to {} candidate workers: {}", 
+                        candidateWorkerLoad.size(),
+                        
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+                Iterator<WorkerLoad> candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                for (String connector : lostAssignments.connectors()) {
+                    // Loop over the the candidate workers as many times as it 
takes
+                    if (!candidateWorkerIterator.hasNext()) {
+                        candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                    }
+                    WorkerLoad worker = candidateWorkerIterator.next();
+                    log.debug("Assigning connector id {} to member {}", 
connector, worker.worker());
+                    worker.assign(connector);
+                }
+                candidateWorkerIterator = candidateWorkerLoad.iterator();
+                for (ConnectorTaskId task : lostAssignments.tasks()) {
+                    if (!candidateWorkerIterator.hasNext()) {
+                        candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+                    }
+                    WorkerLoad worker = candidateWorkerIterator.next();
+                    log.debug("Assigning task id {} to member {}", task, 
worker.worker());
+                    worker.assign(task);
+                }
             } else {
                 log.debug("No single candidate worker was found to assign lost 
tasks. Treating lost tasks as new tasks");
                 
newSubmissions.connectors().addAll(lostAssignments.connectors());
@@ -498,13 +515,13 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
                 .collect(Collectors.toSet());
     }
 
-    private Optional<WorkerLoad> 
pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
+    private List<WorkerLoad> 
pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
         Map<String, WorkerLoad> activeWorkers = 
completeWorkerAssignment.stream()
                 .collect(Collectors.toMap(WorkerLoad::worker, 
Function.identity()));
         return candidateWorkersForReassignment.stream()
                 .map(activeWorkers::get)
                 .filter(Objects::nonNull)
-                .findFirst();
+                .collect(Collectors.toList());
     }
 
     /**
@@ -554,38 +571,37 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         // We have at least one worker assignment (the leader itself) so 
totalWorkersNum can't be 0
         log.debug("Previous rounded down (floor) average number of connectors 
per worker {}", totalActiveConnectorsNum / existingWorkersNum);
         int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of connectors per 
worker {}", floorConnectors);
+        int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % 
totalWorkersNum == 0) ? 0 : 1);
+        log.debug("New average number of connectors per worker rounded down 
(floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
+
 
         log.debug("Previous rounded down (floor) average number of tasks per 
worker {}", totalActiveTasksNum / existingWorkersNum);
         int floorTasks = totalActiveTasksNum / totalWorkersNum;
-        log.debug("New rounded down (floor) average number of tasks per worker 
{}", floorTasks);
+        int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum 
== 0) ? 0 : 1);
+        log.debug("New average number of tasks per worker rounded down (floor) 
{} and rounded up (ceil) {}", floorTasks, ceilTasks);
+        int numToRevoke;
 
-        int numToRevoke = floorConnectors;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<String> connectors = existing.connectors().iterator();
+            numToRevoke = existing.connectorsSize() - ceilConnectors;
             for (int i = existing.connectorsSize(); i > floorConnectors && 
numToRevoke > 0; --i, --numToRevoke) {
                 ConnectorsAndTasks resources = revoking.computeIfAbsent(
                     existing.worker(),
                     w -> new ConnectorsAndTasks.Builder().build());
                 resources.connectors().add(connectors.next());
             }
-            if (numToRevoke == 0) {
-                break;
-            }
         }
 
-        numToRevoke = floorTasks;
         for (WorkerLoad existing : existingWorkers) {
             Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+            numToRevoke = existing.tasksSize() - ceilTasks;
+            log.debug("Tasks on worker {} is higher than ceiling, so revoking 
{} tasks", existing, numToRevoke);
             for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 
0; --i, --numToRevoke) {
                 ConnectorsAndTasks resources = revoking.computeIfAbsent(
                     existing.worker(),
                     w -> new ConnectorsAndTasks.Builder().build());
                 resources.tasks().add(tasks.next());
             }
-            if (numToRevoke == 0) {
-                break;
-            }
         }
 
         return revoking;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 2914434..535a803 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -208,7 +209,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME 
+ 3,
                 "Connector tasks did not stop in time.");
 
-        waitForCondition(this::assertConnectorAndTasksAreUnique,
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced 
between the workers.");
     }
 
@@ -237,7 +238,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME 
+ 3, NUM_TASKS,
                 "Connector tasks did not start in time.");
 
-        waitForCondition(this::assertConnectorAndTasksAreUnique,
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced 
between the workers.");
     }
 
@@ -263,7 +264,53 @@ public class RebalanceSourceConnectorsIntegrationTest {
         connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1,
                 "Connect workers did not start in time.");
 
-        waitForCondition(this::assertConnectorAndTasksAreUnique,
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
+                WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced 
between the workers.");
+    }
+
+    @Test
+    public void testMultipleWorkersRejoining() throws Exception {
+        // create test topic
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
+
+        // start a source connector
+        IntStream.range(0, 4).forEachOrdered(i -> 
connect.configureConnector(CONNECTOR_NAME + i, props));
+
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME 
+ 3, NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
+                WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced 
between the workers.");
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        connect.removeWorker();
+        connect.removeWorker();
+
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 2,
+                "Connect workers did not stop in time.");
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        connect.addWorker();
+        connect.addWorker();
+
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        for (int i = 0; i < 4; ++i) {
+            
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME 
+ i, NUM_TASKS, "Connector tasks did not start in time.");
+        }
+
+        waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced 
between the workers.");
     }
 
@@ -282,7 +329,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         return props;
     }
 
-    private boolean assertConnectorAndTasksAreUnique() {
+    private boolean assertConnectorAndTasksAreUniqueAndBalanced() {
         try {
             Map<String, Collection<String>> connectors = new HashMap<>();
             Map<String, Collection<String>> tasks = new HashMap<>();
@@ -296,7 +343,12 @@ public class RebalanceSourceConnectorsIntegrationTest {
             }
 
             int maxConnectors = 
connectors.values().stream().mapToInt(Collection::size).max().orElse(0);
+            int minConnectors = 
connectors.values().stream().mapToInt(Collection::size).min().orElse(0);
             int maxTasks = 
tasks.values().stream().mapToInt(Collection::size).max().orElse(0);
+            int minTasks = 
tasks.values().stream().mapToInt(Collection::size).min().orElse(0);
+
+            log.debug("Connector balance: {}", formatAssignment(connectors));
+            log.debug("Task balance: {}", formatAssignment(tasks));
 
             assertNotEquals("Found no connectors running!", maxConnectors, 0);
             assertNotEquals("Found no tasks running!", maxTasks, 0);
@@ -306,6 +358,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
             assertEquals("Task assignments are not unique: " + tasks,
                     tasks.values().size(),
                     
tasks.values().stream().distinct().collect(Collectors.toList()).size());
+            assertTrue("Connectors are imbalanced: " + 
formatAssignment(connectors), maxConnectors - minConnectors < 2);
+            assertTrue("Tasks are imbalanced: " + formatAssignment(tasks), 
maxTasks - minTasks < 2);
             return true;
         } catch (Exception e) {
             log.error("Could not check connector state info.", e);
@@ -313,4 +367,13 @@ public class RebalanceSourceConnectorsIntegrationTest {
         }
     }
 
+    private static String formatAssignment(Map<String, Collection<String>> 
assignment) {
+        StringBuilder result = new StringBuilder();
+        for (String worker : 
assignment.keySet().stream().sorted().collect(Collectors.toList())) {
+            result.append(String.format("\n%s=%s", worker, 
assignment.getOrDefault(worker,
+                    Collections.emptyList())));
+        }
+        return result.toString();
+    }
+
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 684da46..0fe1531 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -989,18 +989,24 @@ public class IncrementalCooperativeAssignorTest {
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
                 new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
-        // newWorker joined first, so should be picked up first as a candidate 
for reassignment
+        // both the newWorkers would need to be considered for re assignment 
of connectors and tasks
+        List<String> listOfConnectorsInLast2Workers = new ArrayList<>();
+        
listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker,
 new WorkerLoad.Builder(flakyWorker).build())
+            .connectors());
+        
listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker,
 new WorkerLoad.Builder(flakyWorker).build())
+            .connectors());
+        List<ConnectorTaskId> listOfTasksInLast2Workers = new ArrayList<>();
+        
listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, 
new WorkerLoad.Builder(flakyWorker).build())
+            .tasks());
+        
listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, 
new WorkerLoad.Builder(flakyWorker).build())
+            .tasks());
         assertTrue("Wrong assignment of lost connectors",
-                configuredAssignment.getOrDefault(newWorker, new 
WorkerLoad.Builder(flakyWorker).build())
-                        .connectors()
-                        .containsAll(lostAssignments.connectors()));
+            
listOfConnectorsInLast2Workers.containsAll(lostAssignments.connectors()));
         assertTrue("Wrong assignment of lost tasks",
-                configuredAssignment.getOrDefault(newWorker, new 
WorkerLoad.Builder(flakyWorker).build())
-                        .tasks()
-                        .containsAll(lostAssignments.tasks()));
+            listOfTasksInLast2Workers.containsAll(lostAssignments.tasks()));
         assertThat("Wrong set of workers for reassignments",
-                Collections.emptySet(),
-                is(assignor.candidateWorkersForReassignment));
+            Collections.emptySet(),
+            is(assignor.candidateWorkersForReassignment));
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index 5c44fe6..b538bba 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -302,23 +302,24 @@ public class WorkerCoordinatorIncrementalTest {
 
         result = coordinator.performAssignment(leaderId, 
compatibility.protocol(), responseMembers);
 
+        //Equally distributing tasks across member
         leaderAssignment = deserializeAssignment(result, leaderId);
         assertAssignment(leaderId, offset,
-                Collections.emptyList(), 0,
-                Collections.emptyList(), 2,
-                leaderAssignment);
+            Collections.emptyList(), 0,
+            Collections.emptyList(), 1,
+            leaderAssignment);
 
         memberAssignment = deserializeAssignment(result, memberId);
         assertAssignment(leaderId, offset,
-                Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
-                memberAssignment);
+            Collections.emptyList(), 0,
+            Collections.emptyList(), 1,
+            memberAssignment);
 
         ExtendedAssignment anotherMemberAssignment = 
deserializeAssignment(result, anotherMemberId);
         assertAssignment(leaderId, offset,
-                Collections.emptyList(), 0,
-                Collections.emptyList(), 0,
-                anotherMemberAssignment);
+            Collections.emptyList(), 0,
+            Collections.emptyList(), 0,
+            anotherMemberAssignment);
 
         verify(configStorage, times(configStorageCalls)).snapshot();
     }

Reply via email to