showuon commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r964447367


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() {
         assertEmptyAssignment();
     }
 
+    @Test
+    public void testAssignmentsWhenWorkersJoinAfterRevocations()  {
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance. We should not perform any 
revocations
+        // in this round
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Fourth assignment and a fourth worker joining
+        // Since the worker is joining immediately and within the rebalance 
delay
+        // there should not be any revoking rebalance
+        addNewEmptyWorkers("worker4");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4");
+        assertConnectorAllocations(0, 0, 1, 2);
+        assertTaskAllocations(0, 3, 3, 6);
+
+        // Add new worker immediately. Since a scheduled rebalance is in 
progress,
+        // There should still not be be any revocations
+        addNewEmptyWorkers("worker5");
+        performStandardRebalance();
+        assertTrue(assignor.delay > 0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5");
+        assertConnectorAllocations(0, 0, 0, 1, 2);
+        assertTaskAllocations(0, 0, 3, 3, 6);
+
+        // Add new worker but this time after crossing the delay.
+        // There would be revocations allowed
+        time.sleep(assignor.delay);
+        addNewEmptyWorkers("worker6");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", 
"worker6");
+        assertConnectorAllocations(0, 0, 0, 0, 1, 1);
+        assertTaskAllocations(0, 0, 0, 2, 2, 2);
+
+        // Follow up rebalance since there were revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", 
"worker6");
+        assertConnectorAllocations(0, 0, 0, 1, 1, 1);
+        assertTaskAllocations(2, 2, 2, 2, 2, 2);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void testImmediateRevocationsWhenMaxDelayIs0()  {
+
+        // Customize assignor for this test case
+        rebalanceDelay = 0;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance but we should still revoke 
as rebalance delay is 0
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 1);
+        assertTaskAllocations(3, 3, 4);
+
+        // Follow up rebalance post revocations
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(1, 1, 1);
+        assertTaskAllocations(4, 4, 4);
+        assertBalancedAndCompleteAllocation();
+    }
+
+    @Test
+    public void 
testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() {
+
+        rebalanceDelay = 1;
+        initAssignor();
+
+        addNewConnector("connector3", 4);
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        performStandardRebalance();
+        assertDelay(0);
+        assertWorkers("worker1");
+        assertConnectorAllocations(3);
+        assertTaskAllocations(12);
+        assertBalancedAndCompleteAllocation();
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        // We should revoke.
+        addNewEmptyWorkers("worker2");
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2");
+        assertConnectorAllocations(0, 2);
+        assertTaskAllocations(0, 6);
+
+        // Third assignment immediately after revocations, and a third worker 
joining.
+        // This is a successive revoking rebalance. We shouldn't revoke as 
maxDelay is 1 ms
+        addNewEmptyWorkers("worker3");
+        performStandardRebalance();
+        assertDelay(1);
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);
+
+        // Follow up rebalance post revocations. No revocations should have 
happened
+        performStandardRebalance();
+        assertWorkers("worker1", "worker2", "worker3");
+        assertConnectorAllocations(0, 1, 2);
+        assertTaskAllocations(3, 3, 6);

Review Comment:
   I think we should remove it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to