This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 4f789be  KAFKA-7242: Reverse xform configs before saving (KIP-297)
4f789be is described below

commit 4f789bebf4e8a403b3871f04bc3ba3053440b0be
Author: Robert Yokota <rayok...@gmail.com>
AuthorDate: Tue Aug 28 12:59:08 2018 -0700

    KAFKA-7242: Reverse xform configs before saving (KIP-297)
    
    During actions such as a reconfiguration, the task configs are obtained
    via `Worker.connectorTaskConfigs` and then subsequently saved into an
    instance of `ClusterConfigState`.  The values of the properties that are 
saved
    are post-transformation (of variable references) when they should be
    pre-transformation.  This is to avoid secrets appearing in plaintext in
    the `connect-configs` topic, for example.
    
    The fix is to change the 2 clients of `Worker.connectorTaskConfigs` to
    perform a reverse transformation (values converted back into variable
    references) before saving them into an instance of `ClusterConfigState`.
    The 2 places where the save is performed are
    `DistributedHerder.reconfigureConnector` and
    `StandaloneHerder.updateConnectorTasks`.
    
    The way that the reverse transformation works is by using the
    "raw" connector config (with variable references still intact) from
    `ClusterConfigState` to convert config values back into variable
    references for those keys that are common between the task config
    and the connector config.
    
    There are 2 additional small changes that only affect `StandaloneHerder`:
    
    1) `ClusterConfigState.allTasksConfigs` has been changed to perform a
    transformation (resolution) on all variable references.  This is
    necessary because the result of this method is compared directly to
    `Worker.connectorTaskConfigs`, which also has variable references
    resolved.
    
    2) `StandaloneHerder.startConnector` has been changed to match
    `DistributedHerder.startConnector`.  This is to fix an issue where
    during `StandaloneHerder.restartConnector`, the post-transformed
    connector config would be saved back into `ClusterConfigState`.
    
    I also performed an analysis of all other code paths where configs are
    saved back into `ClusterConfigState` and did not find any other
    issues.
    
    Author: Robert Yokota <rayok...@gmail.com>
    
    Reviewers: Ewen Cheslack-Postava <e...@confluent.io>
    
    Closes #5475 from rayokota/KAFKA-7242-reverse-xform-props
    
    (cherry picked from commit fd5acd73e648a2aab4b970ddf04ad4cace6bad9a)
    Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org>
---
 .../kafka/common/config/ConfigTransformer.java     |  2 +-
 .../kafka/connect/runtime/AbstractHerder.java      | 43 ++++++++++++
 .../runtime/distributed/ClusterConfigState.java    | 22 +++++-
 .../runtime/distributed/DistributedHerder.java     |  5 +-
 .../runtime/standalone/StandaloneHerder.java       | 14 ++--
 .../kafka/connect/runtime/AbstractHerderTest.java  | 81 ++++++++++++++++++++++
 6 files changed, 154 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index f5a3737..6430ffd 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -53,7 +53,7 @@ import java.util.regex.Pattern;
  * {@link ConfigProvider#unsubscribe(String, Set, ConfigChangeCallback)} 
methods.
  */
 public class ConfigTransformer {
-    private static final Pattern DEFAULT_PATTERN = 
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
+    public static final Pattern DEFAULT_PATTERN = 
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
     private static final String EMPTY_PATH = "";
 
     private final Map<String, ConfigProvider> configProviders;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index cadb4e0..82fdecc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -20,9 +20,11 @@ import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -46,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -53,6 +56,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Abstract Herder implementation which handles connector/task lifecycle 
tracking. Extensions
@@ -431,4 +436,42 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
             return null;
         }
     }
+
+    /*
+     * Performs a reverse transformation on a set of task configs, by 
replacing values with variable references.
+     */
+    public static List<Map<String, String>> reverseTransform(String connName,
+                                                             
ClusterConfigState configState,
+                                                             List<Map<String, 
String>> configs) {
+
+        // Find the config keys in the raw connector config that have variable 
references
+        Map<String, String> rawConnConfig = 
configState.rawConnectorConfig(connName);
+        Set<String> connKeysWithVariableValues = 
keysWithVariableValues(rawConnConfig, ConfigTransformer.DEFAULT_PATTERN);
+
+        List<Map<String, String>> result = new ArrayList<>();
+        for (Map<String, String> config : configs) {
+            Map<String, String> newConfig = new HashMap<>(config);
+            for (String key : connKeysWithVariableValues) {
+                if (newConfig.containsKey(key)) {
+                    newConfig.put(key, rawConnConfig.get(key));
+                }
+            }
+            result.add(newConfig);
+        }
+        return result;
+    }
+
+    private static Set<String> keysWithVariableValues(Map<String, String> 
rawConfig, Pattern pattern) {
+        Set<String> keys = new HashSet<>();
+        for (Map.Entry<String, String> config : rawConfig.entrySet()) {
+            if (config.getValue() != null) {
+                Matcher matcher = pattern.matcher(config.getValue());
+                if (matcher.matches()) {
+                    keys.add(config.getKey());
+                }
+            }
+        }
+        return keys;
+    }
+
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
index 11693b5..fc6a50d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
@@ -123,6 +123,10 @@ public class ClusterConfigState {
         return configs;
     }
 
+    public Map<String, String> rawConnectorConfig(String connector) {
+        return connectorConfigs.get(connector);
+    }
+
     /**
      * Get the target state of the connector
      * @param connector name of the connector
@@ -148,16 +152,28 @@ public class ClusterConfigState {
         return configs;
     }
 
+    public Map<String, String> rawTaskConfig(ConnectorTaskId task) {
+        return taskConfigs.get(task);
+    }
+
     /**
-     * Get all task configs for a connector.
+     * Get all task configs for a connector.  The configurations will have 
been transformed by
+     * {@link org.apache.kafka.common.config.ConfigTransformer} by having all 
variable
+     * references replaced with the current values from external instances of
+     * {@link ConfigProvider}, and may include secrets.
      * @param connector name of the connector
      * @return a list of task configurations
      */
     public List<Map<String, String>> allTaskConfigs(String connector) {
         Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>();
         for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : 
this.taskConfigs.entrySet()) {
-            if (taskConfigEntry.getKey().connector().equals(connector))
-                taskConfigs.put(taskConfigEntry.getKey().task(), 
taskConfigEntry.getValue());
+            if (taskConfigEntry.getKey().connector().equals(connector)) {
+                Map<String, String> configs = taskConfigEntry.getValue();
+                if (configTransformer != null) {
+                    configs = configTransformer.transform(connector, configs);
+                }
+                taskConfigs.put(taskConfigEntry.getKey().task(), configs);
+            }
         }
         return new LinkedList<>(taskConfigs.values());
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 5efb78a..f2009db 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1020,8 +1020,9 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                 }
             }
             if (changed) {
+                List<Map<String, String>> rawTaskProps = 
reverseTransform(connName, configState, taskProps);
                 if (isLeader()) {
-                    configBackingStore.putTaskConfigs(connName, taskProps);
+                    configBackingStore.putTaskConfigs(connName, rawTaskProps);
                     cb.onCompletion(null, null);
                 } else {
                     // We cannot forward the request on the same thread 
because this reconfiguration can happen as a result of connector
@@ -1031,7 +1032,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                         public void run() {
                             try {
                                 String reconfigUrl = 
RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
-                                RestClient.httpRequest(reconfigUrl, "POST", 
taskProps, null, config);
+                                RestClient.httpRequest(reconfigUrl, "POST", 
rawTaskProps, null, config);
                                 cb.onCompletion(null, null);
                             } catch (ConnectException e) {
                                 log.error("Request to leader to reconfigure 
connector tasks failed", e);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 20c6a24..40ad980 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -201,7 +201,9 @@ public class StandaloneHerder extends AbstractHerder {
                 created = true;
             }
 
-            if (!startConnector(config)) {
+            configBackingStore.putConnectorConfig(connName, config);
+
+            if (!startConnector(connName)) {
                 callback.onCompletion(new ConnectException("Failed to start 
connector: " + connName), null);
                 return;
             }
@@ -270,9 +272,8 @@ public class StandaloneHerder extends AbstractHerder {
         if (!configState.contains(connName))
             cb.onCompletion(new NotFoundException("Connector " + connName + " 
not found", null), null);
 
-        Map<String, String> config = configState.connectorConfig(connName);
         worker.stopConnector(connName);
-        if (startConnector(config))
+        if (startConnector(connName))
             cb.onCompletion(null, null);
         else
             cb.onCompletion(new ConnectException("Failed to start connector: " 
+ connName), null);
@@ -290,9 +291,7 @@ public class StandaloneHerder extends AbstractHerder {
         return new StandaloneHerderRequest(requestSeqNum.incrementAndGet(), 
future);
     }
 
-    private boolean startConnector(Map<String, String> connectorProps) {
-        String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
-        configBackingStore.putConnectorConfig(connName, connectorProps);
+    private boolean startConnector(String connName) {
         Map<String, String> connConfigs = 
configState.connectorConfig(connName);
         TargetState targetState = configState.targetState(connName);
         return worker.startConnector(connName, connConfigs, new 
HerderConnectorContext(this, connName), this, targetState);
@@ -336,7 +335,8 @@ public class StandaloneHerder extends AbstractHerder {
 
         if (!newTaskConfigs.equals(oldTaskConfigs)) {
             removeConnectorTasks(connName);
-            configBackingStore.putTaskConfigs(connName, newTaskConfigs);
+            List<Map<String, String>> rawTaskConfigs = 
reverseTransform(connName, configState, newTaskConfigs);
+            configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
             createConnectorTasks(connName, configState.targetState(connName));
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index db3cf27..8dbda18 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -20,12 +20,15 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -40,6 +43,7 @@ import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -61,6 +65,53 @@ import static org.junit.Assert.assertTrue;
 @PrepareForTest({AbstractHerder.class})
 public class AbstractHerderTest {
 
+    private static final String CONN1 = "sourceA";
+    private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
+    private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
+    private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
+    private static final Integer MAX_TASKS = 3;
+    private static final Map<String, String> CONN1_CONFIG = new HashMap<>();
+    private static final String TEST_KEY = "testKey";
+    private static final String TEST_KEY2 = "testKey2";
+    private static final String TEST_KEY3 = "testKey3";
+    private static final String TEST_VAL = "testVal";
+    private static final String TEST_VAL2 = "testVal2";
+    private static final String TEST_REF = "${file:/tmp/somefile.txt:somevar}";
+    private static final String TEST_REF2 = 
"${file:/tmp/somefile2.txt:somevar2}";
+    private static final String TEST_REF3 = 
"${file:/tmp/somefile3.txt:somevar3}";
+    static {
+        CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
+        CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, 
MAX_TASKS.toString());
+        CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSourceConnector.class.getName());
+        CONN1_CONFIG.put(TEST_KEY, TEST_REF);
+        CONN1_CONFIG.put(TEST_KEY2, TEST_REF2);
+        CONN1_CONFIG.put(TEST_KEY3, TEST_REF3);
+    }
+    private static final Map<String, String> TASK_CONFIG = new HashMap<>();
+    static {
+        TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, 
BogusSourceTask.class.getName());
+        TASK_CONFIG.put(TEST_KEY, TEST_REF);
+    }
+    private static final List<Map<String, String>> TASK_CONFIGS = new 
ArrayList<>();
+    static {
+        TASK_CONFIGS.add(TASK_CONFIG);
+        TASK_CONFIGS.add(TASK_CONFIG);
+        TASK_CONFIGS.add(TASK_CONFIG);
+    }
+    private static final HashMap<ConnectorTaskId, Map<String, String>> 
TASK_CONFIGS_MAP = new HashMap<>();
+    static {
+        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
+        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
+        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
+    }
+    private static final ClusterConfigState SNAPSHOT = new 
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
+    private static final ClusterConfigState SNAPSHOT_NO_TASKS = new 
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
+            Collections.emptyMap(), Collections.<String>emptySet());
+
     private final String workerId = "workerId";
     private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
     private final int generation = 5;
@@ -248,6 +299,29 @@ public class AbstractHerderTest {
         verifyAll();
     }
 
+    @Test
+    public void testReverseTransformConfigs() throws Exception {
+        // Construct a task config with constant values for TEST_KEY and 
TEST_KEY2
+        Map<String, String> newTaskConfig = new HashMap<>();
+        newTaskConfig.put(TaskConfig.TASK_CLASS_CONFIG, 
BogusSourceTask.class.getName());
+        newTaskConfig.put(TEST_KEY, TEST_VAL);
+        newTaskConfig.put(TEST_KEY2, TEST_VAL2);
+        List<Map<String, String>> newTaskConfigs = new ArrayList<>();
+        newTaskConfigs.add(newTaskConfig);
+
+        // The SNAPSHOT has a task config with TEST_KEY and TEST_REF
+        List<Map<String, String>> reverseTransformed = 
AbstractHerder.reverseTransform(CONN1, SNAPSHOT, newTaskConfigs);
+        assertEquals(TEST_REF, reverseTransformed.get(0).get(TEST_KEY));
+
+        // The SNAPSHOT has no task configs but does have a connector config 
with TEST_KEY2 and TEST_REF2
+        reverseTransformed = AbstractHerder.reverseTransform(CONN1, 
SNAPSHOT_NO_TASKS, newTaskConfigs);
+        assertEquals(TEST_REF2, reverseTransformed.get(0).get(TEST_KEY2));
+
+        // The reverseTransformed result should not have TEST_KEY3 since 
newTaskConfigs does not have TEST_KEY3
+        reverseTransformed = AbstractHerder.reverseTransform(CONN1, 
SNAPSHOT_NO_TASKS, newTaskConfigs);
+        assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3));
+    }
+
     private AbstractHerder createConfigValidationHerder(Class<? extends 
Connector> connectorClass) {
 
 
@@ -299,4 +373,11 @@ public class AbstractHerderTest {
 
         }
     }
+
+    // We need to use a real class here due to some issue with mocking 
java.lang.Class
+    private abstract class BogusSourceConnector extends SourceConnector {
+    }
+
+    private abstract class BogusSourceTask extends SourceTask {
+    }
 }

Reply via email to