This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b0e28351ecf MINOR: Refactor task change logic to AbstractHerder, reuse
for standalone mode. (#13287)
b0e28351ecf is described below
commit b0e28351ecf98f7a0ffc5008a575125d777dcc1c
Author: Greg Harris <[email protected]>
AuthorDate: Thu Mar 2 06:53:52 2023 -0800
MINOR: Refactor task change logic to AbstractHerder, reuse for standalone
mode. (#13287)
Reviewers: Chris Egerton <[email protected]>
---
.../kafka/connect/runtime/AbstractHerder.java | 23 ++++++++++++++++++++++
.../runtime/distributed/DistributedHerder.java | 18 +----------------
.../runtime/standalone/StandaloneHerder.java | 3 +--
.../kafka/connect/storage/ClusterConfigState.java | 23 ----------------------
.../storage/KafkaConfigBackingStoreTest.java | 5 ++---
5 files changed, 27 insertions(+), 45 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index e5d6ec857e1..228fd72509c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -785,6 +785,29 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
return result;
}
+ public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ int currentNumTasks = configState.taskCount(connName);
+ boolean result = false;
+ if (taskProps.size() != currentNumTasks) {
+ log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
+ result = true;
+ } else {
+ for (int index = 0; index < currentNumTasks; index++) {
+ ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+ if
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
+ log.debug("Connector {} has change in configuration for
task {}-{}", connName, connName, index);
+ result = true;
+ }
+ }
+ }
+ if (result) {
+ log.debug("Reconfiguring connector {}: writing new updated
configurations for tasks", connName);
+ } else {
+ log.debug("Skipping reconfiguration of connector {} as generated
configs appear unchanged", connName);
+ }
+ return result;
+ }
+
// Visible for testing
static Set<String> keysWithVariableValues(Map<String, String> rawConfig,
Pattern pattern) {
Set<String> keys = new HashSet<>();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8a8d8dd95df..046ccb8e256 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1958,23 +1958,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
final List<Map<String, String>> taskProps =
worker.connectorTaskConfigs(connName, connConfig);
- boolean changed = false;
- int currentNumTasks = configState.taskCount(connName);
- if (taskProps.size() != currentNumTasks) {
- log.debug("Change in connector task count from {} to {},
writing updated task configurations", currentNumTasks, taskProps.size());
- changed = true;
- } else {
- int index = 0;
- for (Map<String, String> taskConfig : taskProps) {
- if (!taskConfig.equals(configState.taskConfig(new
ConnectorTaskId(connName, index)))) {
- log.debug("Change in task configurations, writing
updated task configurations");
- changed = true;
- break;
- }
- index++;
- }
- }
- if (changed) {
+ if (taskConfigsChanged(configState, connName, taskProps)) {
List<Map<String, String>> rawTaskProps =
reverseTransform(connName, configState, taskProps);
if (isLeader()) {
writeToConfigTopicAsLeader(() ->
configBackingStore.putTaskConfigs(connName, rawTaskProps));
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 76a988b9ea3..61f7dd73be9 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -427,9 +427,8 @@ public class StandaloneHerder extends AbstractHerder {
}
List<Map<String, String>> newTaskConfigs =
recomputeTaskConfigs(connName);
- List<Map<String, String>> oldTaskConfigs =
configState.allTaskConfigs(connName);
- if (!newTaskConfigs.equals(oldTaskConfigs)) {
+ if (taskConfigsChanged(configState, connName, newTaskConfigs)) {
removeConnectorTasks(connName);
List<Map<String, String>> rawTaskConfigs =
reverseTransform(connName, configState, newTaskConfigs);
configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
index c8cb341faa1..1025373042f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.TreeMap;
/**
* An immutable snapshot of the configuration state of connectors and tasks in
a Kafka Connect cluster.
@@ -188,28 +187,6 @@ public class ClusterConfigState {
return taskConfigs.get(task);
}
- /**
- * Get all task configs for a connector. The configurations will have been
transformed by
- * {@link org.apache.kafka.common.config.ConfigTransformer} by having all
variable
- * references replaced with the current values from external instances of
- * {@link ConfigProvider}, and may include secrets.
- * @param connector name of the connector
- * @return a list of task configurations
- */
- public List<Map<String, String>> allTaskConfigs(String connector) {
- Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>();
- for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry :
this.taskConfigs.entrySet()) {
- if (taskConfigEntry.getKey().connector().equals(connector)) {
- Map<String, String> configs = taskConfigEntry.getValue();
- if (configTransformer != null) {
- configs = configTransformer.transform(connector, configs);
- }
- taskConfigs.put(taskConfigEntry.getKey().task(), configs);
- }
- }
- return Collections.unmodifiableList(new
ArrayList<>(taskConfigs.values()));
- }
-
/**
* Get the number of tasks for a given connector.
* @param connectorName name of the connector to look up tasks for
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 8fb3bf30165..bda35bd3ed1 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -937,15 +937,14 @@ public class KafkaConfigBackingStoreTest {
ClusterConfigState configState = configStorage.snapshot();
assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
- assertEquals(SAMPLE_CONFIGS.subList(0, 2),
configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new
ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
+ assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new
ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0)));
configStorage.refresh(0, TimeUnit.SECONDS);
configState = configStorage.snapshot();
// Connector should now be removed from the snapshot
assertFalse(configState.contains(CONNECTOR_IDS.get(0)));
- // Task configs for the deleted connector should also be removed from
the snapshot
- assertEquals(Collections.emptyList(),
configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
// Ensure that the deleted connector's deferred task updates have been
cleaned up
// in order to prevent unbounded growth of the map