This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 3b6b9d1  KAFKA-9841: Revoke duplicate connectors and tasks when zombie 
workers return with an outdated assignment (#8453)
3b6b9d1 is described below

commit 3b6b9d1ae7da3c23413898bef0ca52821eb2d865
Author: Lucent-Wong <manchesterf...@live.cn>
AuthorDate: Thu Jun 11 08:59:32 2020 +0800

    KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers 
return with an outdated assignment (#8453)
    
    With Incremental Cooperative Rebalancing, if a worker returns after it's 
been out of the group for sometime (essentially as a zombie worker) and hasn't 
voluntarily revoked its own connectors and tasks in the meantime, there's the 
possibility that these assignments have been distributed to other workers and 
redundant connectors and tasks might be running now in the Connect cluster.
    
    This PR complements previous fixes such as KAFKA-9184, KAFKA-9849 and 
KAFKA-9851 providing a last line of defense against zombie tasks: if at any 
rebalance round the leader worker detects that there are duplicate assignments 
in the group, it revokes them completely and resolves duplication with a 
correct assignment in the rebalancing round that will follow task revocation.
    
    Author: Wang <ywan...@ebay.com>
    
    Reviewer: Konstantine Karantasis <konstant...@confluent.io>
---
 .../IncrementalCooperativeAssignor.java            |  59 ++++++++
 .../IncrementalCooperativeAssignorTest.java        | 162 ++++++++++++++++++++-
 2 files changed, 217 insertions(+), 4 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index d4e9d87..744a099 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -244,6 +244,10 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, 
connectorAssignments, taskAssignments);
         log.debug("Connector and task to delete assignments: {}", toRevoke);
 
+        // Revoking redundant connectors/tasks if the the workers have 
duplicate assignments
+        toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, 
connectorAssignments, taskAssignments));
+        log.debug("Connector and task to revoke assignments (include 
duplicated assignments): {}", toRevoke);
+
         // Recompute the complete assignment excluding the deleted 
connectors-and-tasks
         completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
         connectorAssignments =
@@ -359,6 +363,61 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         return previousAssignment;
     }
 
+    private ConnectorsAndTasks duplicatedAssignments(Map<String, 
ExtendedWorkerState> memberConfigs) {
+        Set<String> connectors = memberConfigs.entrySet().stream()
+                .flatMap(memberConfig -> 
memberConfig.getValue().assignment().connectors().stream())
+                .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()))
+                .entrySet().stream()
+                .filter(entry -> entry.getValue() > 1L)
+                .map(entry -> entry.getKey())
+                .collect(Collectors.toSet());
+
+        Set<ConnectorTaskId> tasks = memberConfigs.values().stream()
+                .flatMap(state -> state.assignment().tasks().stream())
+                .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()))
+                .entrySet().stream()
+                .filter(entry -> entry.getValue() > 1L)
+                .map(entry -> entry.getKey())
+                .collect(Collectors.toSet());
+        return new ConnectorsAndTasks.Builder().with(connectors, 
tasks).build();
+    }
+
+    private Map<String, ConnectorsAndTasks> 
computeDuplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs,
+                                             Map<String, Collection<String>> 
connectorAssignments,
+                                             Map<String, 
Collection<ConnectorTaskId>> taskAssignment) {
+        ConnectorsAndTasks duplicatedAssignments = 
duplicatedAssignments(memberConfigs);
+        log.debug("Duplicated assignments: {}", duplicatedAssignments);
+
+        Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
+        if (!duplicatedAssignments.connectors().isEmpty()) {
+            connectorAssignments.entrySet().stream()
+                    .forEach(entry -> {
+                        Set<String> duplicatedConnectors = new 
HashSet<>(duplicatedAssignments.connectors());
+                        duplicatedConnectors.retainAll(entry.getValue());
+                        if (!duplicatedConnectors.isEmpty()) {
+                            toRevoke.computeIfAbsent(
+                                entry.getKey(),
+                                v -> new ConnectorsAndTasks.Builder().build()
+                            ).connectors().addAll(duplicatedConnectors);
+                        }
+                    });
+        }
+        if (!duplicatedAssignments.tasks().isEmpty()) {
+            taskAssignment.entrySet().stream()
+                    .forEach(entry -> {
+                        Set<ConnectorTaskId> duplicatedTasks = new 
HashSet<>(duplicatedAssignments.tasks());
+                        duplicatedTasks.retainAll(entry.getValue());
+                        if (!duplicatedTasks.isEmpty()) {
+                            toRevoke.computeIfAbsent(
+                                entry.getKey(),
+                                v -> new ConnectorsAndTasks.Builder().build()
+                            ).tasks().addAll(duplicatedTasks);
+                        }
+                    });
+        }
+        return toRevoke;
+    }
+
     // visible for testing
     protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                                          ConnectorsAndTasks newSubmissions,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 3e72ce9..684da46 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -1089,6 +1089,148 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.delay);
     }
 
+    @Test
+    public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker with duplicate assignment 
joining and all connectors running on previous worker
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        ExtendedAssignment duplicatedWorkerAssignment = 
newExpandableAssignment();
+        duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2));
+        duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 
4));
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, duplicatedWorkerAssignment));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 2, 8, "worker1", "worker2");
+
+        // Third assignment after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 4, 0, 2, "worker1", "worker2");
+
+        // fourth rebalance after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 2, 0, 0, "worker1", "worker2");
+
+        // Fifth rebalance should not change assignments
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
+        verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
+    }
+
+    @Test
+    public void 
testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        //delete connector1
+        configState = clusterConfigState(offset, 2, 1, 4);
+        when(coordinator.configSnapshot()).thenReturn(configState);
+
+        // Second assignment with a second worker with duplicate assignment 
joining and the duplicated assignment is deleted at the same time
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        ExtendedAssignment duplicatedWorkerAssignment = 
newExpandableAssignment();
+        duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2));
+        duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 
4));
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, duplicatedWorkerAssignment));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 2, 8, "worker1", "worker2");
+
+        // Third assignment after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2");
+
+        // fourth rebalance after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 2, 0, 0, "worker1", "worker2");
+
+        // Fifth rebalance should not change assignments
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
+        verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
+    }
+
     private WorkerLoad emptyWorkerLoad(String worker) {
         return new WorkerLoad.Builder(worker).build();
     }
@@ -1107,20 +1249,32 @@ public class IncrementalCooperativeAssignorTest {
     }
 
     private static List<ConnectorTaskId> newTasks(int start, int end) {
+        return newTasks("task", start, end);
+    }
+
+    private static List<ConnectorTaskId> newTasks(String connectorName, int 
start, int end) {
         return IntStream.range(start, end)
-                .mapToObj(i -> new ConnectorTaskId("task", i))
+                .mapToObj(i -> new ConnectorTaskId(connectorName, i))
                 .collect(Collectors.toList());
     }
 
     private static ClusterConfigState clusterConfigState(long offset,
                                                          int connectorNum,
                                                          int taskNum) {
+        return clusterConfigState(offset, 1, connectorNum, taskNum);
+    }
+
+    private static ClusterConfigState clusterConfigState(long offset,
+                                                         int connectorStart,
+                                                         int connectorNum,
+                                                         int taskNum) {
+        int connectorNumEnd = connectorStart + connectorNum - 1;
         return new ClusterConfigState(
                 offset,
                 null,
-                connectorTaskCounts(1, connectorNum, taskNum),
-                connectorConfigs(1, connectorNum),
-                connectorTargetStates(1, connectorNum, TargetState.STARTED),
+                connectorTaskCounts(connectorStart, connectorNumEnd, taskNum),
+                connectorConfigs(connectorStart, connectorNumEnd),
+                connectorTargetStates(connectorStart, connectorNumEnd, 
TargetState.STARTED),
                 taskConfigs(0, connectorNum, connectorNum * taskNum),
                 Collections.emptySet());
     }

Reply via email to