This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53ec055394d KAFKA-17105: Prevent redundant restarts for newly-created 
connectors (#16581)
53ec055394d is described below

commit 53ec055394d4ac1cffe23faec679b5cf5b234a15
Author: Chris Egerton <[email protected]>
AuthorDate: Wed Jul 17 20:33:47 2024 +0200

    KAFKA-17105: Prevent redundant restarts for newly-created connectors 
(#16581)
    
    Reviewers: Greg Harris <[email protected]>
---
 .../runtime/distributed/DistributedHerder.java     | 17 +++++---
 .../runtime/distributed/DistributedHerderTest.java | 48 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 5 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index d35b7ff43af..174adea8cdd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -2432,10 +2432,13 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             log.info("Connector {} config removed", connector);
 
             synchronized (DistributedHerder.this) {
-                // rebalance after connector removal to ensure that existing 
tasks are balanced among workers
-                if (configState.contains(connector))
+                if (configState.contains(connector)) {
+                    // rebalance after connector removal to ensure that 
existing tasks are balanced among workers
                     needsReconfigRebalance = true;
-                connectorConfigUpdates.add(connector);
+                } else {
+                    connectorConfigUpdates.add(connector);
+                }
+
             }
             member.wakeup();
         }
@@ -2448,9 +2451,13 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             // to be bounced. However, this callback may also indicate a 
connector *addition*, which does require
             // a rebalance, so we need to be careful about what operation we 
request.
             synchronized (DistributedHerder.this) {
-                if (!configState.contains(connector))
+                if (!configState.contains(connector)) {
                     needsReconfigRebalance = true;
-                connectorConfigUpdates.add(connector);
+                } else {
+                    // Only need to restart the connector if it already existed
+                    connectorConfigUpdates.add(connector);
+                }
+
             }
             member.wakeup();
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index abb1ae56da3..bbc074e9730 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -77,6 +77,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -1579,6 +1581,52 @@ public class DistributedHerderTest {
         verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
     }
 
+    @ParameterizedTest
+    @ValueSource(shorts = {CONNECT_PROTOCOL_V0, CONNECT_PROTOCOL_V1, 
CONNECT_PROTOCOL_V2})
+    public void testConnectorConfigDetectedAfterLeaderAlreadyAssigned(short 
protocolVersion) {
+        connectProtocolVersion = protocolVersion;
+
+        // If a connector was added, we need to rebalance
+        when(worker.isSinkConnector(CONN1)).thenReturn(Boolean.TRUE);
+        when(member.memberId()).thenReturn("member");
+        when(member.currentProtocolVersion()).thenReturn(protocolVersion);
+
+        // join, no configs so no need to catch up on config topic
+        expectRebalance(-1, Collections.emptyList(), Collections.emptyList());
+        expectMemberPoll();
+
+        herder.tick(); // join
+
+        // Checks for config updates and starts rebalance
+        configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated 
config
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // Rebalance will be triggered when the new config is detected
+        // This rebalance is unnecessary and only the result of a 
mostly-benign bug;
+        // see https://issues.apache.org/jira/browse/KAFKA-17155
+        doNothing().when(member).requestRejoin();
+
+        // Rebalance will be triggered when the new config is detected
+        // Performs rebalance and gets new assignment
+        // Important--we're simulating a scenario where the leader has already 
detected the new
+        // connector, and assigns it to our herder at the top of its tick 
thread
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList());
+
+        ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        }).when(worker).startConnector(eq(CONN1), any(), any(), eq(herder), 
eq(TargetState.STARTED), onStart.capture());
+        expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> 
TASK_CONFIGS);
+
+        herder.tick(); // assigned connector
+
+        // We should only start the connector once; if we start it several 
times, that's probably a bug
+        verify(worker, times(1)).startConnector(eq(CONN1), any(), any(), 
eq(herder), eq(TargetState.STARTED), onStart.capture());
+        verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
+    }
+
     @Test
     public void testConnectorConfigUpdate() {
         // Connector config can be applied without any rebalance

Reply via email to