This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 53ec055394d KAFKA-17105: Prevent redundant restarts for newly-created
connectors (#16581)
53ec055394d is described below
commit 53ec055394d4ac1cffe23faec679b5cf5b234a15
Author: Chris Egerton <[email protected]>
AuthorDate: Wed Jul 17 20:33:47 2024 +0200
KAFKA-17105: Prevent redundant restarts for newly-created connectors
(#16581)
Reviewers: Greg Harris <[email protected]>
---
.../runtime/distributed/DistributedHerder.java | 17 +++++---
.../runtime/distributed/DistributedHerderTest.java | 48 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 5 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index d35b7ff43af..174adea8cdd 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -2432,10 +2432,13 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
log.info("Connector {} config removed", connector);
synchronized (DistributedHerder.this) {
- // rebalance after connector removal to ensure that existing
tasks are balanced among workers
- if (configState.contains(connector))
+ if (configState.contains(connector)) {
+ // rebalance after connector removal to ensure that
existing tasks are balanced among workers
needsReconfigRebalance = true;
- connectorConfigUpdates.add(connector);
+ } else {
+ connectorConfigUpdates.add(connector);
+ }
+
}
member.wakeup();
}
@@ -2448,9 +2451,13 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
// to be bounced. However, this callback may also indicate a
connector *addition*, which does require
// a rebalance, so we need to be careful about what operation we
request.
synchronized (DistributedHerder.this) {
- if (!configState.contains(connector))
+ if (!configState.contains(connector)) {
needsReconfigRebalance = true;
- connectorConfigUpdates.add(connector);
+ } else {
+ // Only need to restart the connector if it already existed
+ connectorConfigUpdates.add(connector);
+ }
+
}
member.wakeup();
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index abb1ae56da3..bbc074e9730 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -77,6 +77,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -1579,6 +1581,52 @@ public class DistributedHerderTest {
verifyNoMoreInteractions(worker, member, configBackingStore,
statusBackingStore);
}
+ @ParameterizedTest
+ @ValueSource(shorts = {CONNECT_PROTOCOL_V0, CONNECT_PROTOCOL_V1,
CONNECT_PROTOCOL_V2})
+ public void testConnectorConfigDetectedAfterLeaderAlreadyAssigned(short
protocolVersion) {
+ connectProtocolVersion = protocolVersion;
+
+ // If a connector was added, we need to rebalance
+ when(worker.isSinkConnector(CONN1)).thenReturn(Boolean.TRUE);
+ when(member.memberId()).thenReturn("member");
+ when(member.currentProtocolVersion()).thenReturn(protocolVersion);
+
+ // join, no configs so no need to catch up on config topic
+ expectRebalance(-1, Collections.emptyList(), Collections.emptyList());
+ expectMemberPoll();
+
+ herder.tick(); // join
+
+ // Checks for config updates and starts rebalance
+ configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated
config
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+ // Rebalance will be triggered when the new config is detected
+ // This rebalance is unnecessary and only the result of a
mostly-benign bug;
+ // see https://issues.apache.org/jira/browse/KAFKA-17155
+ doNothing().when(member).requestRejoin();
+
+ // Rebalance will be triggered when the new config is detected
+ // Performs rebalance and gets new assignment
+ // Important--we're simulating a scenario where the leader has already
detected the new
+ // connector, and assigns it to our herder at the top of its tick
thread
+ expectRebalance(Collections.emptyList(), Collections.emptyList(),
+ ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1),
Collections.emptyList());
+
+ ArgumentCaptor<Callback<TargetState>> onStart =
ArgumentCaptor.forClass(Callback.class);
+ doAnswer(invocation -> {
+ onStart.getValue().onCompletion(null, TargetState.STARTED);
+ return true;
+ }).when(worker).startConnector(eq(CONN1), any(), any(), eq(herder),
eq(TargetState.STARTED), onStart.capture());
+ expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation ->
TASK_CONFIGS);
+
+ herder.tick(); // assigned connector
+
+ // We should only start the connector once; if we start it several
times, that's probably a bug
+ verify(worker, times(1)).startConnector(eq(CONN1), any(), any(),
eq(herder), eq(TargetState.STARTED), onStart.capture());
+ verifyNoMoreInteractions(worker, member, configBackingStore,
statusBackingStore);
+ }
+
@Test
public void testConnectorConfigUpdate() {
// Connector config can be applied without any rebalance