This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0e28351ecf MINOR: Refactor task change logic to AbstractHerder, reuse 
for standalone mode. (#13287)
b0e28351ecf is described below

commit b0e28351ecf98f7a0ffc5008a575125d777dcc1c
Author: Greg Harris <[email protected]>
AuthorDate: Thu Mar 2 06:53:52 2023 -0800

    MINOR: Refactor task change logic to AbstractHerder, reuse for standalone 
mode. (#13287)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../kafka/connect/runtime/AbstractHerder.java      | 23 ++++++++++++++++++++++
 .../runtime/distributed/DistributedHerder.java     | 18 +----------------
 .../runtime/standalone/StandaloneHerder.java       |  3 +--
 .../kafka/connect/storage/ClusterConfigState.java  | 23 ----------------------
 .../storage/KafkaConfigBackingStoreTest.java       |  5 ++---
 5 files changed, 27 insertions(+), 45 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index e5d6ec857e1..228fd72509c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -785,6 +785,29 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         return result;
     }
 
+    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+        int currentNumTasks = configState.taskCount(connName);
+        boolean result = false;
+        if (taskProps.size() != currentNumTasks) {
+            log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
+            result = true;
+        } else {
+            for (int index = 0; index < currentNumTasks; index++) {
+                ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+                if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
+                    log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
+                    result = true;
+                }
+            }
+        }
+        if (result) {
+            log.debug("Reconfiguring connector {}: writing new updated 
configurations for tasks", connName);
+        } else {
+            log.debug("Skipping reconfiguration of connector {} as generated 
configs appear unchanged", connName);
+        }
+        return result;
+    }
+
     // Visible for testing
     static Set<String> keysWithVariableValues(Map<String, String> rawConfig, 
Pattern pattern) {
         Set<String> keys = new HashSet<>();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8a8d8dd95df..046ccb8e256 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1958,23 +1958,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             }
 
             final List<Map<String, String>> taskProps = 
worker.connectorTaskConfigs(connName, connConfig);
-            boolean changed = false;
-            int currentNumTasks = configState.taskCount(connName);
-            if (taskProps.size() != currentNumTasks) {
-                log.debug("Change in connector task count from {} to {}, 
writing updated task configurations", currentNumTasks, taskProps.size());
-                changed = true;
-            } else {
-                int index = 0;
-                for (Map<String, String> taskConfig : taskProps) {
-                    if (!taskConfig.equals(configState.taskConfig(new 
ConnectorTaskId(connName, index)))) {
-                        log.debug("Change in task configurations, writing 
updated task configurations");
-                        changed = true;
-                        break;
-                    }
-                    index++;
-                }
-            }
-            if (changed) {
+            if (taskConfigsChanged(configState, connName, taskProps)) {
                 List<Map<String, String>> rawTaskProps = 
reverseTransform(connName, configState, taskProps);
                 if (isLeader()) {
                     writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskConfigs(connName, rawTaskProps));
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 76a988b9ea3..61f7dd73be9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -427,9 +427,8 @@ public class StandaloneHerder extends AbstractHerder {
         }
 
         List<Map<String, String>> newTaskConfigs = 
recomputeTaskConfigs(connName);
-        List<Map<String, String>> oldTaskConfigs = 
configState.allTaskConfigs(connName);
 
-        if (!newTaskConfigs.equals(oldTaskConfigs)) {
+        if (taskConfigsChanged(configState, connName, newTaskConfigs)) {
             removeConnectorTasks(connName);
             List<Map<String, String>> rawTaskConfigs = 
reverseTransform(connName, configState, newTaskConfigs);
             configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
index c8cb341faa1..1025373042f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.TreeMap;
 
 /**
  * An immutable snapshot of the configuration state of connectors and tasks in 
a Kafka Connect cluster.
@@ -188,28 +187,6 @@ public class ClusterConfigState {
         return taskConfigs.get(task);
     }
 
-    /**
-     * Get all task configs for a connector. The configurations will have been 
transformed by
-     * {@link org.apache.kafka.common.config.ConfigTransformer} by having all 
variable
-     * references replaced with the current values from external instances of
-     * {@link ConfigProvider}, and may include secrets.
-     * @param connector name of the connector
-     * @return a list of task configurations
-     */
-    public List<Map<String, String>> allTaskConfigs(String connector) {
-        Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>();
-        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : 
this.taskConfigs.entrySet()) {
-            if (taskConfigEntry.getKey().connector().equals(connector)) {
-                Map<String, String> configs = taskConfigEntry.getValue();
-                if (configTransformer != null) {
-                    configs = configTransformer.transform(connector, configs);
-                }
-                taskConfigs.put(taskConfigEntry.getKey().task(), configs);
-            }
-        }
-        return Collections.unmodifiableList(new 
ArrayList<>(taskConfigs.values()));
-    }
-
     /**
      * Get the number of tasks for a given connector.
      * @param connectorName name of the connector to look up tasks for
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 8fb3bf30165..bda35bd3ed1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -937,15 +937,14 @@ public class KafkaConfigBackingStoreTest {
         ClusterConfigState configState = configStorage.snapshot();
         assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.subList(0, 2), 
configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new 
ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new 
ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
         assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0)));
 
         configStorage.refresh(0, TimeUnit.SECONDS);
         configState = configStorage.snapshot();
         // Connector should now be removed from the snapshot
         assertFalse(configState.contains(CONNECTOR_IDS.get(0)));
-        // Task configs for the deleted connector should also be removed from 
the snapshot
-        assertEquals(Collections.emptyList(), 
configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
         assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
         // Ensure that the deleted connector's deferred task updates have been 
cleaned up
         // in order to prevent unbounded growth of the map

Reply via email to