This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c25b311cbe KAFKA-14814: Skip Connect target state updates when the
configs store has same state (#13426)
3c25b311cbe is described below
commit 3c25b311cbe5c6c77b764bd9dbac28ee2c0b4f94
Author: Chaitanya Mukka <[email protected]>
AuthorDate: Thu Mar 23 20:53:38 2023 +0530
KAFKA-14814: Skip Connect target state updates when the configs store has
same state (#13426)
Reviewers: Yash Mayya <[email protected]>, Chris Egerton
<[email protected]>
---
.../connect/storage/KafkaConfigBackingStore.java | 6 ++-
.../connect/storage/MemoryConfigBackingStore.java | 3 +-
.../storage/KafkaConfigBackingStoreTest.java | 49 ++++++++++++++++++++++
3 files changed, 55 insertions(+), 3 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 5d44953ec16..f33b1aef7b3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -908,6 +908,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
private void processTargetStateRecord(String connectorName, SchemaAndValue
value) {
boolean removed = false;
+ boolean stateChanged = true;
synchronized (lock) {
if (value.value() == null) {
// When connector configs are removed, we also write
tombstones for the target state.
@@ -935,7 +936,8 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
try {
TargetState state = TargetState.valueOf((String)
targetState);
log.debug("Setting target state for connector '{}' to {}",
connectorName, targetState);
- connectorTargetStates.put(connectorName, state);
+ TargetState prevState =
connectorTargetStates.put(connectorName, state);
+ stateChanged = !state.equals(prevState);
} catch (IllegalArgumentException e) {
log.error("Invalid target state for connector '{}': {}",
connectorName, targetState);
return;
@@ -945,7 +947,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
// Note that we do not notify the update listener if the target state
has been removed.
// Instead we depend on the removal callback of the connector config
itself to notify the worker.
- if (started && !removed)
+ if (started && !removed && stateChanged)
updateListener.onConnectorTargetStateChange(connectorName);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index eadb873b45a..dcdfd71296b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -147,9 +147,10 @@ public class MemoryConfigBackingStore implements
ConfigBackingStore {
if (connectorState == null)
throw new IllegalArgumentException("No connector `" + connector +
"` configured");
+ TargetState prevState = connectorState.targetState;
connectorState.targetState = state;
- if (updateListener != null)
+ if (updateListener != null && !state.equals(prevState))
updateListener.onConnectorTargetStateChange(connector);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index bda35bd3ed1..63fe460685f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -146,6 +146,7 @@ public class KafkaConfigBackingStoreTest {
new
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
);
private static final Struct TARGET_STATE_PAUSED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
+ private static final Struct TARGET_STATE_STARTED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
= new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@@ -889,6 +890,54 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testSameTargetState() throws Exception {
+ // verify that we handle target state changes correctly when they come
up through the log
+
+ expectConfigure();
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME,
0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME,
0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME,
0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME,
0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserialized);
+
+ // on resume update listener shouldn't be called
+
configUpdateListener.onConnectorTargetStateChange(EasyMock.anyString());
+ EasyMock.expectLastCall().andStubThrow(new AssertionError("unexpected
call to onConnectorTargetStateChange"));
+
+ expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0),
TARGET_STATE_STARTED);
+
+ expectPartitionCount(1);
+ expectStop();
+
+ PowerMock.replayAll();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ configStorage.start();
+
+ // Should see a single connector with initial state paused
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+
+ configStorage.refresh(0, TimeUnit.SECONDS);
+
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testBackgroundConnectorDeletion() throws Exception {
// verify that we handle connector deletions correctly when they come
up through the log