This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new cec5ddc KAFKA-10413: Allow for even distribution of lost/new tasks
when multiple Connect workers join at the same time (#9319)
cec5ddc is described below
commit cec5ddcd4eb2bc13207ff28b2830af12550b0008
Author: Ramesh Krishnan M <[email protected]>
AuthorDate: Wed Feb 3 01:34:06 2021 +0530
KAFKA-10413: Allow for even distribution of lost/new tasks when multiple
Connect workers join at the same time (#9319)
First issue: When more than one workers join the Connect group the
incremental cooperative assignor revokes and reassigns at most average number
of tasks per worker.
Side-effect: This results in the additional workers joining the group stay
idle and would require more future rebalances to happen to have even
distribution of tasks.
Fix: As part of task assignment calculation following a deployment, the
reassignment of tasks are calculated by revoking all the tasks above the
rounded up (ceil) average number of tasks.
Second issue: When more than one worker is lost and rejoins the group at
most one worker will be re assigned with the lost tasks from all the workers
that left the group.
Side-effect: In scenarios where more than one worker is lost and rejoins
the group only one among them gets assigned all the partitions that were lost
in the past. The additional workers that have joined would not get any task
assigned to them until a rebalance that happens in future.
Fix: As part fo lost task re assignment all the new workers that have
joined the group would be considered for task assignment and would be assigned
in a round robin fashion with the new tasks.
Testing strategy :
* System testing in a Kubernetes environment completed
* New integration tests to test for balanced tasks
* Updated unit tests.
Co-authored-by: Rameshkrishnan Muthusamy
<[email protected]>
Co-authored-by: Randall Hauch <[email protected]>
Co-authored-by: Konstantine Karantasis <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Konstantine Karantasis
<[email protected]>
---
.../IncrementalCooperativeAssignor.java | 54 ++++++++++------
.../RebalanceSourceConnectorsIntegrationTest.java | 71 ++++++++++++++++++++--
.../IncrementalCooperativeAssignorTest.java | 24 +++++---
.../WorkerCoordinatorIncrementalTest.java | 19 +++---
4 files changed, 127 insertions(+), 41 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index 744a099..c3f2f4b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -34,7 +34,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
@@ -445,16 +444,34 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
if (scheduledRebalance > 0 && now >= scheduledRebalance) {
// delayed rebalance expired and it's time to assign resources
log.debug("Delayed rebalance expired. Reassigning lost tasks");
- Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+ List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
if (!candidateWorkersForReassignment.isEmpty()) {
candidateWorkerLoad =
pickCandidateWorkerForReassignment(completeWorkerAssignment);
}
- if (candidateWorkerLoad.isPresent()) {
- WorkerLoad workerLoad = candidateWorkerLoad.get();
- log.debug("A candidate worker has been found to assign lost
tasks: {}", workerLoad.worker());
- lostAssignments.connectors().forEach(workerLoad::assign);
- lostAssignments.tasks().forEach(workerLoad::assign);
+ if (!candidateWorkerLoad.isEmpty()) {
+ log.debug("Assigning lost tasks to {} candidate workers: {}",
+ candidateWorkerLoad.size(),
+
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+ Iterator<WorkerLoad> candidateWorkerIterator =
candidateWorkerLoad.iterator();
+ for (String connector : lostAssignments.connectors()) {
+ // Loop over the the candidate workers as many times as it
takes
+ if (!candidateWorkerIterator.hasNext()) {
+ candidateWorkerIterator =
candidateWorkerLoad.iterator();
+ }
+ WorkerLoad worker = candidateWorkerIterator.next();
+ log.debug("Assigning connector id {} to member {}",
connector, worker.worker());
+ worker.assign(connector);
+ }
+ candidateWorkerIterator = candidateWorkerLoad.iterator();
+ for (ConnectorTaskId task : lostAssignments.tasks()) {
+ if (!candidateWorkerIterator.hasNext()) {
+ candidateWorkerIterator =
candidateWorkerLoad.iterator();
+ }
+ WorkerLoad worker = candidateWorkerIterator.next();
+ log.debug("Assigning task id {} to member {}", task,
worker.worker());
+ worker.assign(task);
+ }
} else {
log.debug("No single candidate worker was found to assign lost
tasks. Treating lost tasks as new tasks");
newSubmissions.connectors().addAll(lostAssignments.connectors());
@@ -498,13 +515,13 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
.collect(Collectors.toSet());
}
- private Optional<WorkerLoad>
pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
+ private List<WorkerLoad>
pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
Map<String, WorkerLoad> activeWorkers =
completeWorkerAssignment.stream()
.collect(Collectors.toMap(WorkerLoad::worker,
Function.identity()));
return candidateWorkersForReassignment.stream()
.map(activeWorkers::get)
.filter(Objects::nonNull)
- .findFirst();
+ .collect(Collectors.toList());
}
/**
@@ -554,38 +571,37 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
// We have at least one worker assignment (the leader itself) so
totalWorkersNum can't be 0
log.debug("Previous rounded down (floor) average number of connectors
per worker {}", totalActiveConnectorsNum / existingWorkersNum);
int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
- log.debug("New rounded down (floor) average number of connectors per
worker {}", floorConnectors);
+ int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum %
totalWorkersNum == 0) ? 0 : 1);
+ log.debug("New average number of connectors per worker rounded down
(floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
+
log.debug("Previous rounded down (floor) average number of tasks per
worker {}", totalActiveTasksNum / existingWorkersNum);
int floorTasks = totalActiveTasksNum / totalWorkersNum;
- log.debug("New rounded down (floor) average number of tasks per worker
{}", floorTasks);
+ int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum
== 0) ? 0 : 1);
+ log.debug("New average number of tasks per worker rounded down (floor)
{} and rounded up (ceil) {}", floorTasks, ceilTasks);
+ int numToRevoke;
- int numToRevoke = floorConnectors;
for (WorkerLoad existing : existingWorkers) {
Iterator<String> connectors = existing.connectors().iterator();
+ numToRevoke = existing.connectorsSize() - ceilConnectors;
for (int i = existing.connectorsSize(); i > floorConnectors &&
numToRevoke > 0; --i, --numToRevoke) {
ConnectorsAndTasks resources = revoking.computeIfAbsent(
existing.worker(),
w -> new ConnectorsAndTasks.Builder().build());
resources.connectors().add(connectors.next());
}
- if (numToRevoke == 0) {
- break;
- }
}
- numToRevoke = floorTasks;
for (WorkerLoad existing : existingWorkers) {
Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+ numToRevoke = existing.tasksSize() - ceilTasks;
+ log.debug("Tasks on worker {} is higher than ceiling, so revoking
{} tasks", existing, numToRevoke);
for (int i = existing.tasksSize(); i > floorTasks && numToRevoke >
0; --i, --numToRevoke) {
ConnectorsAndTasks resources = revoking.computeIfAbsent(
existing.worker(),
w -> new ConnectorsAndTasks.Builder().build());
resources.tasks().add(tasks.next());
}
- if (numToRevoke == 0) {
- break;
- }
}
return revoking;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 2914434..535a803 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -208,7 +209,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME
+ 3,
"Connector tasks did not stop in time.");
- waitForCondition(this::assertConnectorAndTasksAreUnique,
+ waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced
between the workers.");
}
@@ -237,7 +238,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME
+ 3, NUM_TASKS,
"Connector tasks did not start in time.");
- waitForCondition(this::assertConnectorAndTasksAreUnique,
+ waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced
between the workers.");
}
@@ -263,7 +264,53 @@ public class RebalanceSourceConnectorsIntegrationTest {
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1,
"Connect workers did not start in time.");
- waitForCondition(this::assertConnectorAndTasksAreUnique,
+ waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
+ WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced
between the workers.");
+ }
+
+ @Test
+ public void testMultipleWorkersRejoining() throws Exception {
+ // create test topic
+ connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+
+ // setup up props for the source connector
+ Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+ "Connect workers did not start in time.");
+
+ // start a source connector
+ IntStream.range(0, 4).forEachOrdered(i ->
connect.configureConnector(CONNECTOR_NAME + i, props));
+
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME
+ 3, NUM_TASKS,
+ "Connector tasks did not start in time.");
+
+ waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
+ WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced
between the workers.");
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+ connect.removeWorker();
+ connect.removeWorker();
+
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 2,
+ "Connect workers did not stop in time.");
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+ connect.addWorker();
+ connect.addWorker();
+
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+ "Connect workers did not start in time.");
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+ for (int i = 0; i < 4; ++i) {
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME
+ i, NUM_TASKS, "Connector tasks did not start in time.");
+ }
+
+ waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced
between the workers.");
}
@@ -282,7 +329,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
return props;
}
- private boolean assertConnectorAndTasksAreUnique() {
+ private boolean assertConnectorAndTasksAreUniqueAndBalanced() {
try {
Map<String, Collection<String>> connectors = new HashMap<>();
Map<String, Collection<String>> tasks = new HashMap<>();
@@ -296,7 +343,12 @@ public class RebalanceSourceConnectorsIntegrationTest {
}
int maxConnectors =
connectors.values().stream().mapToInt(Collection::size).max().orElse(0);
+ int minConnectors =
connectors.values().stream().mapToInt(Collection::size).min().orElse(0);
int maxTasks =
tasks.values().stream().mapToInt(Collection::size).max().orElse(0);
+ int minTasks =
tasks.values().stream().mapToInt(Collection::size).min().orElse(0);
+
+ log.debug("Connector balance: {}", formatAssignment(connectors));
+ log.debug("Task balance: {}", formatAssignment(tasks));
assertNotEquals("Found no connectors running!", maxConnectors, 0);
assertNotEquals("Found no tasks running!", maxTasks, 0);
@@ -306,6 +358,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
assertEquals("Task assignments are not unique: " + tasks,
tasks.values().size(),
tasks.values().stream().distinct().collect(Collectors.toList()).size());
+ assertTrue("Connectors are imbalanced: " +
formatAssignment(connectors), maxConnectors - minConnectors < 2);
+ assertTrue("Tasks are imbalanced: " + formatAssignment(tasks),
maxTasks - minTasks < 2);
return true;
} catch (Exception e) {
log.error("Could not check connector state info.", e);
@@ -313,4 +367,13 @@ public class RebalanceSourceConnectorsIntegrationTest {
}
}
+ private static String formatAssignment(Map<String, Collection<String>>
assignment) {
+ StringBuilder result = new StringBuilder();
+ for (String worker :
assignment.keySet().stream().sorted().collect(Collectors.toList())) {
+ result.append(String.format("\n%s=%s", worker,
assignment.getOrDefault(worker,
+ Collections.emptyList())));
+ }
+ return result.toString();
+ }
+
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 684da46..0fe1531 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -989,18 +989,24 @@ public class IncrementalCooperativeAssignorTest {
assignor.handleLostAssignments(lostAssignments, newSubmissions,
new ArrayList<>(configuredAssignment.values()), memberConfigs);
- // newWorker joined first, so should be picked up first as a candidate
for reassignment
+ // both the newWorkers would need to be considered for re assignment
of connectors and tasks
+ List<String> listOfConnectorsInLast2Workers = new ArrayList<>();
+
listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker,
new WorkerLoad.Builder(flakyWorker).build())
+ .connectors());
+
listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker,
new WorkerLoad.Builder(flakyWorker).build())
+ .connectors());
+ List<ConnectorTaskId> listOfTasksInLast2Workers = new ArrayList<>();
+
listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker,
new WorkerLoad.Builder(flakyWorker).build())
+ .tasks());
+
listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker,
new WorkerLoad.Builder(flakyWorker).build())
+ .tasks());
assertTrue("Wrong assignment of lost connectors",
- configuredAssignment.getOrDefault(newWorker, new
WorkerLoad.Builder(flakyWorker).build())
- .connectors()
- .containsAll(lostAssignments.connectors()));
+
listOfConnectorsInLast2Workers.containsAll(lostAssignments.connectors()));
assertTrue("Wrong assignment of lost tasks",
- configuredAssignment.getOrDefault(newWorker, new
WorkerLoad.Builder(flakyWorker).build())
- .tasks()
- .containsAll(lostAssignments.tasks()));
+ listOfTasksInLast2Workers.containsAll(lostAssignments.tasks()));
assertThat("Wrong set of workers for reassignments",
- Collections.emptySet(),
- is(assignor.candidateWorkersForReassignment));
+ Collections.emptySet(),
+ is(assignor.candidateWorkersForReassignment));
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index 5c44fe6..b538bba 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -302,23 +302,24 @@ public class WorkerCoordinatorIncrementalTest {
result = coordinator.performAssignment(leaderId,
compatibility.protocol(), responseMembers);
+ //Equally distributing tasks across member
leaderAssignment = deserializeAssignment(result, leaderId);
assertAssignment(leaderId, offset,
- Collections.emptyList(), 0,
- Collections.emptyList(), 2,
- leaderAssignment);
+ Collections.emptyList(), 0,
+ Collections.emptyList(), 1,
+ leaderAssignment);
memberAssignment = deserializeAssignment(result, memberId);
assertAssignment(leaderId, offset,
- Collections.emptyList(), 0,
- Collections.emptyList(), 0,
- memberAssignment);
+ Collections.emptyList(), 0,
+ Collections.emptyList(), 1,
+ memberAssignment);
ExtendedAssignment anotherMemberAssignment =
deserializeAssignment(result, anotherMemberId);
assertAssignment(leaderId, offset,
- Collections.emptyList(), 0,
- Collections.emptyList(), 0,
- anotherMemberAssignment);
+ Collections.emptyList(), 0,
+ Collections.emptyList(), 0,
+ anotherMemberAssignment);
verify(configStorage, times(configStorageCalls)).snapshot();
}