This is an automated email from the ASF dual-hosted git repository. kkarantasis pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new 3b6b9d1 KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment (#8453) 3b6b9d1 is described below commit 3b6b9d1ae7da3c23413898bef0ca52821eb2d865 Author: Lucent-Wong <manchesterf...@live.cn> AuthorDate: Thu Jun 11 08:59:32 2020 +0800 KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment (#8453) With Incremental Cooperative Rebalancing, if a worker returns after it's been out of the group for sometime (essentially as a zombie worker) and hasn't voluntarily revoked its own connectors and tasks in the meantime, there's the possibility that these assignments have been distributed to other workers and redundant connectors and tasks might be running now in the Connect cluster. This PR complements previous fixes such as KAFKA-9184, KAFKA-9849 and KAFKA-9851 providing a last line of defense against zombie tasks: if at any rebalance round the leader worker detects that there are duplicate assignments in the group, it revokes them completely and resolves duplication with a correct assignment in the rebalancing round that will follow task revocation. Author: Wang <ywan...@ebay.com> Reviewer: Konstantine Karantasis <konstant...@confluent.io> --- .../IncrementalCooperativeAssignor.java | 59 ++++++++ .../IncrementalCooperativeAssignorTest.java | 162 ++++++++++++++++++++- 2 files changed, 217 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index d4e9d87..744a099 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -244,6 +244,10 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments); log.debug("Connector and task to delete assignments: {}", toRevoke); + // Revoking redundant connectors/tasks if the the workers have duplicate assignments + toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments)); + log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke); + // Recompute the complete assignment excluding the deleted connectors-and-tasks completeWorkerAssignment = workerAssignment(memberConfigs, deleted); connectorAssignments = @@ -359,6 +363,61 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor { return previousAssignment; } + private ConnectorsAndTasks duplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs) { + Set<String> connectors = memberConfigs.entrySet().stream() + .flatMap(memberConfig -> memberConfig.getValue().assignment().connectors().stream()) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())) + .entrySet().stream() + .filter(entry -> entry.getValue() > 1L) + .map(entry -> entry.getKey()) + .collect(Collectors.toSet()); + + Set<ConnectorTaskId> tasks = memberConfigs.values().stream() + .flatMap(state -> state.assignment().tasks().stream()) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())) + .entrySet().stream() + .filter(entry -> entry.getValue() > 1L) + .map(entry -> entry.getKey()) + .collect(Collectors.toSet()); + return new ConnectorsAndTasks.Builder().with(connectors, tasks).build(); + } + + private Map<String, ConnectorsAndTasks> computeDuplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs, + Map<String, Collection<String>> connectorAssignments, + Map<String, Collection<ConnectorTaskId>> taskAssignment) { + ConnectorsAndTasks duplicatedAssignments = duplicatedAssignments(memberConfigs); + log.debug("Duplicated assignments: {}", duplicatedAssignments); + + Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>(); + if (!duplicatedAssignments.connectors().isEmpty()) { + connectorAssignments.entrySet().stream() + .forEach(entry -> { + Set<String> duplicatedConnectors = new HashSet<>(duplicatedAssignments.connectors()); + duplicatedConnectors.retainAll(entry.getValue()); + if (!duplicatedConnectors.isEmpty()) { + toRevoke.computeIfAbsent( + entry.getKey(), + v -> new ConnectorsAndTasks.Builder().build() + ).connectors().addAll(duplicatedConnectors); + } + }); + } + if (!duplicatedAssignments.tasks().isEmpty()) { + taskAssignment.entrySet().stream() + .forEach(entry -> { + Set<ConnectorTaskId> duplicatedTasks = new HashSet<>(duplicatedAssignments.tasks()); + duplicatedTasks.retainAll(entry.getValue()); + if (!duplicatedTasks.isEmpty()) { + toRevoke.computeIfAbsent( + entry.getKey(), + v -> new ConnectorsAndTasks.Builder().build() + ).tasks().addAll(duplicatedTasks); + } + }); + } + return toRevoke; + } + // visible for testing protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, ConnectorsAndTasks newSubmissions, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 3e72ce9..684da46 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -1089,6 +1089,148 @@ public class IncrementalCooperativeAssignorTest { assertEquals(0, assignor.delay); } + @Test + public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() { + when(coordinator.configSnapshot()).thenReturn(configState); + doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); + + // First assignment with 1 worker and 2 connectors configured but not yet assigned + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(2, 8, 0, 0, "worker1"); + + // Second assignment with a second worker with duplicate assignment joining and all connectors running on previous worker + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment(); + duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2)); + duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 4)); + memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, duplicatedWorkerAssignment)); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 0, 2, 8, "worker1", "worker2"); + + // Third assignment after revocations + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(1, 4, 0, 2, "worker1", "worker2"); + + // fourth rebalance after revocations + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 2, 0, 0, "worker1", "worker2"); + + // Fifth rebalance should not change assignments + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 0, 0, 0, "worker1", "worker2"); + + verify(coordinator, times(rebalanceNum)).configSnapshot(); + verify(coordinator, times(rebalanceNum)).leaderState(any()); + verify(coordinator, times(2 * rebalanceNum)).generationId(); + verify(coordinator, times(rebalanceNum)).memberId(); + verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); + } + + @Test + public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() { + when(coordinator.configSnapshot()).thenReturn(configState); + doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture()); + + // First assignment with 1 worker and 2 connectors configured but not yet assigned + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(2, 8, 0, 0, "worker1"); + + //delete connector1 + configState = clusterConfigState(offset, 2, 1, 4); + when(coordinator.configSnapshot()).thenReturn(configState); + + // Second assignment with a second worker with duplicate assignment joining and the duplicated assignment is deleted at the same time + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment(); + duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2)); + duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 4)); + memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, duplicatedWorkerAssignment)); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 0, 2, 8, "worker1", "worker2"); + + // Third assignment after revocations + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 0, 0, 2, "worker1", "worker2"); + + // fourth rebalance after revocations + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 2, 0, 0, "worker1", "worker2"); + + // Fifth rebalance should not change assignments + applyAssignments(returnedAssignments); + memberConfigs = memberConfigs(leader, offset, assignments); + assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator, protocolVersion); + ++rebalanceNum; + returnedAssignments = assignmentsCapture.getValue(); + assertDelay(0, returnedAssignments); + expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments); + assertNoReassignments(memberConfigs, expectedMemberConfigs); + assertAssignment(0, 0, 0, 0, "worker1", "worker2"); + + verify(coordinator, times(rebalanceNum)).configSnapshot(); + verify(coordinator, times(rebalanceNum)).leaderState(any()); + verify(coordinator, times(2 * rebalanceNum)).generationId(); + verify(coordinator, times(rebalanceNum)).memberId(); + verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); + } + private WorkerLoad emptyWorkerLoad(String worker) { return new WorkerLoad.Builder(worker).build(); } @@ -1107,20 +1249,32 @@ public class IncrementalCooperativeAssignorTest { } private static List<ConnectorTaskId> newTasks(int start, int end) { + return newTasks("task", start, end); + } + + private static List<ConnectorTaskId> newTasks(String connectorName, int start, int end) { return IntStream.range(start, end) - .mapToObj(i -> new ConnectorTaskId("task", i)) + .mapToObj(i -> new ConnectorTaskId(connectorName, i)) .collect(Collectors.toList()); } private static ClusterConfigState clusterConfigState(long offset, int connectorNum, int taskNum) { + return clusterConfigState(offset, 1, connectorNum, taskNum); + } + + private static ClusterConfigState clusterConfigState(long offset, + int connectorStart, + int connectorNum, + int taskNum) { + int connectorNumEnd = connectorStart + connectorNum - 1; return new ClusterConfigState( offset, null, - connectorTaskCounts(1, connectorNum, taskNum), - connectorConfigs(1, connectorNum), - connectorTargetStates(1, connectorNum, TargetState.STARTED), + connectorTaskCounts(connectorStart, connectorNumEnd, taskNum), + connectorConfigs(connectorStart, connectorNumEnd), + connectorTargetStates(connectorStart, connectorNumEnd, TargetState.STARTED), taskConfigs(0, connectorNum, connectorNum * taskNum), Collections.emptySet()); }