showuon commented on code in PR #12561: URL: https://github.com/apache/kafka/pull/12561#discussion_r964447367
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -118,6 +118,152 @@ public void testTaskAssignmentWhenWorkerJoins() { assertEmptyAssignment(); } + @Test + public void testAssignmentsWhenWorkersJoinAfterRevocations() { + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance. We should not perform any revocations + // in this round + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertTrue(assignor.delay > 0); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 2); + assertTaskAllocations(3, 3, 6); + + // Fourth assignment and a fourth worker joining + // Since the worker is joining immediately and within the rebalance delay + // there should not be any revoking rebalance + addNewEmptyWorkers("worker4"); + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3", "worker4"); + assertConnectorAllocations(0, 0, 1, 2); + assertTaskAllocations(0, 3, 3, 6); + + // Add new worker immediately. Since a scheduled rebalance is in progress, + // There should still not be be any revocations + addNewEmptyWorkers("worker5"); + performStandardRebalance(); + assertTrue(assignor.delay > 0); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5"); + assertConnectorAllocations(0, 0, 0, 1, 2); + assertTaskAllocations(0, 0, 3, 3, 6); + + // Add new worker but this time after crossing the delay. + // There would be revocations allowed + time.sleep(assignor.delay); + addNewEmptyWorkers("worker6"); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6"); + assertConnectorAllocations(0, 0, 0, 0, 1, 1); + assertTaskAllocations(0, 0, 0, 2, 2, 2); + + // Follow up rebalance since there were revocations + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3", "worker4", "worker5", "worker6"); + assertConnectorAllocations(0, 0, 0, 1, 1, 1); + assertTaskAllocations(2, 2, 2, 2, 2, 2); + assertBalancedAndCompleteAllocation(); + } + + @Test + public void testImmediateRevocationsWhenMaxDelayIs0() { + + // Customize assignor for this test case + rebalanceDelay = 0; + initAssignor(); + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance but we should still revoke as rebalance delay is 0 + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 1); + assertTaskAllocations(3, 3, 4); + + // Follow up rebalance post revocations + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(1, 1, 1); + assertTaskAllocations(4, 4, 4); + assertBalancedAndCompleteAllocation(); + } + + @Test + public void testSuccessiveRevocationsWhenMaxDelayIsEqualToExpBackOffInitialInterval() { + + rebalanceDelay = 1; + initAssignor(); + + addNewConnector("connector3", 4); + // First assignment with 1 worker and 2 connectors configured but not yet assigned + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1"); + assertConnectorAllocations(3); + assertTaskAllocations(12); + assertBalancedAndCompleteAllocation(); + + // Second assignment with a second worker joining and all connectors running on previous worker + // We should revoke. + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(0, 2); + assertTaskAllocations(0, 6); + + // Third assignment immediately after revocations, and a third worker joining. + // This is a successive revoking rebalance. We shouldn't revoke as maxDelay is 1 ms + addNewEmptyWorkers("worker3"); + performStandardRebalance(); + assertDelay(1); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 2); + assertTaskAllocations(3, 3, 6); + + // Follow up rebalance post revocations. No revocations should have happened + performStandardRebalance(); + assertWorkers("worker1", "worker2", "worker3"); + assertConnectorAllocations(0, 1, 2); + assertTaskAllocations(3, 3, 6); Review Comment: I think we should remove it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org