yashmayya commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r962156128


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -285,18 +292,13 @@ ClusterAssignment performTaskAssignment(
 
         handleLostAssignments(lostAssignments, newSubmissions, 
completeWorkerAssignment);
 
+        if (delay > 0) {
+            log.debug("Delaying {}ms for revoking tasks.", delay);
+        }
         // Do not revoke resources for re-assignment while a delayed rebalance 
is active
-        // Also we do not revoke in two consecutive rebalances by the same 
leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing 
without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: 
{})", canRevoke, delay);
-        if (canRevoke) {
+        if (delay == 0) {
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, 
currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", 
toRevoke);
+                    performTaskRevocation(configured, 
completeWorkerAssignment);

Review Comment:
   Why are we changing the first parameter here from `activeAssignments` -> 
`configured`? `configured` is basically the latest set of tasks and connectors 
as per the leader's view of the config topic, whereas `activeAssignments` is 
the assimilation of tasks and connectors from the member assignments of the 
group in the current round of rebalancing. Is this so that we don't take into 
account deleted connectors/tasks while doing revocation? This will also take 
into account new connectors/tasks as well as connectors/tasks lost from the 
last assignment while calculating the average connector/task load per worker 
for performing revocations - is this intentional? This seems reasonable to do 
since if there's no scheduled delayed rebalance, the new as well as lost 
connector/tasks will be assigned to the workers in this round of rebalance 
anyway. But I'm just wondering if there's a reason why we were doing 
revocations only based on existing member assignments earlier.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -307,9 +309,31 @@ ClusterAssignment performTaskAssignment(
                     existing.tasks().addAll(assignment.tasks());
                 }
             );
-            canRevoke = toExplicitlyRevoke.size() == 0;
+
+            // If this round and the previous round involved revocation, we 
will do an exponential
+            // backoff delay to prevent rebalance storms.
+            if (revokedInPrevious && !toExplicitlyRevoke.isEmpty()) {
+                numSuccessiveRevokingRebalances++;
+                delay = (int) 
consecutiveRevokingRebalancesBackoff.backoff(numSuccessiveRevokingRebalances);
+                log.debug("Consecutive revoking rebalances observed. Need to 
wait for {} ms", delay);

Review Comment:
   Yeah, +1 to @C0urante's point that scheduling a rebalance here doesn't seem 
to serve any purpose?
   
   > I think a better approach would be to recompute the potential backoff 
delay between consecutive allocation-balancing revocations if delay == 0, and 
if it is non-zero, then skip those revocations during the current round.
   
   In this case, we wouldn't know how much time has passed between the previous 
round of revoking rebalance and the current rebalance right? Or do we not 
account for that with the assumption that members will rejoin with a new round 
of rebalance immediately after a revoking round of rebalance anyway?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to