This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new 4f789be KAFKA-7242: Reverse xform configs before saving (KIP-297) 4f789be is described below commit 4f789bebf4e8a403b3871f04bc3ba3053440b0be Author: Robert Yokota <rayok...@gmail.com> AuthorDate: Tue Aug 28 12:59:08 2018 -0700 KAFKA-7242: Reverse xform configs before saving (KIP-297) During actions such as a reconfiguration, the task configs are obtained via `Worker.connectorTaskConfigs` and then subsequently saved into an instance of `ClusterConfigState`. The values of the properties that are saved are post-transformation (of variable references) when they should be pre-transformation. This is to avoid secrets appearing in plaintext in the `connect-configs` topic, for example. The fix is to change the 2 clients of `Worker.connectorTaskConfigs` to perform a reverse transformation (values converted back into variable references) before saving them into an instance of `ClusterConfigState`. The 2 places where the save is performed are `DistributedHerder.reconfigureConnector` and `StandaloneHerder.updateConnectorTasks`. The way that the reverse transformation works is by using the "raw" connector config (with variable references still intact) from `ClusterConfigState` to convert config values back into variable references for those keys that are common between the task config and the connector config. There are 2 additional small changes that only affect `StandaloneHerder`: 1) `ClusterConfigState.allTasksConfigs` has been changed to perform a transformation (resolution) on all variable references. This is necessary because the result of this method is compared directly to `Worker.connectorTaskConfigs`, which also has variable references resolved. 2) `StandaloneHerder.startConnector` has been changed to match `DistributedHerder.startConnector`. This is to fix an issue where during `StandaloneHerder.restartConnector`, the post-transformed connector config would be saved back into `ClusterConfigState`. I also performed an analysis of all other code paths where configs are saved back into `ClusterConfigState` and did not find any other issues. Author: Robert Yokota <rayok...@gmail.com> Reviewers: Ewen Cheslack-Postava <e...@confluent.io> Closes #5475 from rayokota/KAFKA-7242-reverse-xform-props (cherry picked from commit fd5acd73e648a2aab4b970ddf04ad4cace6bad9a) Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org> --- .../kafka/common/config/ConfigTransformer.java | 2 +- .../kafka/connect/runtime/AbstractHerder.java | 43 ++++++++++++ .../runtime/distributed/ClusterConfigState.java | 22 +++++- .../runtime/distributed/DistributedHerder.java | 5 +- .../runtime/standalone/StandaloneHerder.java | 14 ++-- .../kafka/connect/runtime/AbstractHerderTest.java | 81 ++++++++++++++++++++++ 6 files changed, 154 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java index f5a3737..6430ffd 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java @@ -53,7 +53,7 @@ import java.util.regex.Pattern; * {@link ConfigProvider#unsubscribe(String, Set, ConfigChangeCallback)} methods. */ public class ConfigTransformer { - private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}"); + public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}"); private static final String EMPTY_PATH = ""; private final Map<String, ConfigProvider> configProviders; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index cadb4e0..82fdecc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -20,9 +20,11 @@ import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.ConfigKey; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; @@ -46,6 +48,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -53,6 +56,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions @@ -431,4 +436,42 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return null; } } + + /* + * Performs a reverse transformation on a set of task configs, by replacing values with variable references. + */ + public static List<Map<String, String>> reverseTransform(String connName, + ClusterConfigState configState, + List<Map<String, String>> configs) { + + // Find the config keys in the raw connector config that have variable references + Map<String, String> rawConnConfig = configState.rawConnectorConfig(connName); + Set<String> connKeysWithVariableValues = keysWithVariableValues(rawConnConfig, ConfigTransformer.DEFAULT_PATTERN); + + List<Map<String, String>> result = new ArrayList<>(); + for (Map<String, String> config : configs) { + Map<String, String> newConfig = new HashMap<>(config); + for (String key : connKeysWithVariableValues) { + if (newConfig.containsKey(key)) { + newConfig.put(key, rawConnConfig.get(key)); + } + } + result.add(newConfig); + } + return result; + } + + private static Set<String> keysWithVariableValues(Map<String, String> rawConfig, Pattern pattern) { + Set<String> keys = new HashSet<>(); + for (Map.Entry<String, String> config : rawConfig.entrySet()) { + if (config.getValue() != null) { + Matcher matcher = pattern.matcher(config.getValue()); + if (matcher.matches()) { + keys.add(config.getKey()); + } + } + } + return keys; + } + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java index 11693b5..fc6a50d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java @@ -123,6 +123,10 @@ public class ClusterConfigState { return configs; } + public Map<String, String> rawConnectorConfig(String connector) { + return connectorConfigs.get(connector); + } + /** * Get the target state of the connector * @param connector name of the connector @@ -148,16 +152,28 @@ public class ClusterConfigState { return configs; } + public Map<String, String> rawTaskConfig(ConnectorTaskId task) { + return taskConfigs.get(task); + } + /** - * Get all task configs for a connector. + * Get all task configs for a connector. The configurations will have been transformed by + * {@link org.apache.kafka.common.config.ConfigTransformer} by having all variable + * references replaced with the current values from external instances of + * {@link ConfigProvider}, and may include secrets. * @param connector name of the connector * @return a list of task configurations */ public List<Map<String, String>> allTaskConfigs(String connector) { Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>(); for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : this.taskConfigs.entrySet()) { - if (taskConfigEntry.getKey().connector().equals(connector)) - taskConfigs.put(taskConfigEntry.getKey().task(), taskConfigEntry.getValue()); + if (taskConfigEntry.getKey().connector().equals(connector)) { + Map<String, String> configs = taskConfigEntry.getValue(); + if (configTransformer != null) { + configs = configTransformer.transform(connector, configs); + } + taskConfigs.put(taskConfigEntry.getKey().task(), configs); + } } return new LinkedList<>(taskConfigs.values()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 5efb78a..f2009db 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1020,8 +1020,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } if (changed) { + List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps); if (isLeader()) { - configBackingStore.putTaskConfigs(connName, taskProps); + configBackingStore.putTaskConfigs(connName, rawTaskProps); cb.onCompletion(null, null); } else { // We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector @@ -1031,7 +1032,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { public void run() { try { String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks"); - RestClient.httpRequest(reconfigUrl, "POST", taskProps, null, config); + RestClient.httpRequest(reconfigUrl, "POST", rawTaskProps, null, config); cb.onCompletion(null, null); } catch (ConnectException e) { log.error("Request to leader to reconfigure connector tasks failed", e); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 20c6a24..40ad980 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -201,7 +201,9 @@ public class StandaloneHerder extends AbstractHerder { created = true; } - if (!startConnector(config)) { + configBackingStore.putConnectorConfig(connName, config); + + if (!startConnector(connName)) { callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null); return; } @@ -270,9 +272,8 @@ public class StandaloneHerder extends AbstractHerder { if (!configState.contains(connName)) cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); - Map<String, String> config = configState.connectorConfig(connName); worker.stopConnector(connName); - if (startConnector(config)) + if (startConnector(connName)) cb.onCompletion(null, null); else cb.onCompletion(new ConnectException("Failed to start connector: " + connName), null); @@ -290,9 +291,7 @@ public class StandaloneHerder extends AbstractHerder { return new StandaloneHerderRequest(requestSeqNum.incrementAndGet(), future); } - private boolean startConnector(Map<String, String> connectorProps) { - String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG); - configBackingStore.putConnectorConfig(connName, connectorProps); + private boolean startConnector(String connName) { Map<String, String> connConfigs = configState.connectorConfig(connName); TargetState targetState = configState.targetState(connName); return worker.startConnector(connName, connConfigs, new HerderConnectorContext(this, connName), this, targetState); @@ -336,7 +335,8 @@ public class StandaloneHerder extends AbstractHerder { if (!newTaskConfigs.equals(oldTaskConfigs)) { removeConnectorTasks(connName); - configBackingStore.putTaskConfigs(connName, newTaskConfigs); + List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs); + configBackingStore.putTaskConfigs(connName, rawTaskConfigs); createConnectorTasks(connName, configState.targetState(connName)); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index db3cf27..8dbda18 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -20,12 +20,15 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.transforms.Transformation; @@ -40,6 +43,7 @@ import org.powermock.api.easymock.annotation.MockStrict; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -61,6 +65,53 @@ import static org.junit.Assert.assertTrue; @PrepareForTest({AbstractHerder.class}) public class AbstractHerderTest { + private static final String CONN1 = "sourceA"; + private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0); + private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1); + private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2); + private static final Integer MAX_TASKS = 3; + private static final Map<String, String> CONN1_CONFIG = new HashMap<>(); + private static final String TEST_KEY = "testKey"; + private static final String TEST_KEY2 = "testKey2"; + private static final String TEST_KEY3 = "testKey3"; + private static final String TEST_VAL = "testVal"; + private static final String TEST_VAL2 = "testVal2"; + private static final String TEST_REF = "${file:/tmp/somefile.txt:somevar}"; + private static final String TEST_REF2 = "${file:/tmp/somefile2.txt:somevar2}"; + private static final String TEST_REF3 = "${file:/tmp/somefile3.txt:somevar3}"; + static { + CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1); + CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); + CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); + CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); + CONN1_CONFIG.put(TEST_KEY, TEST_REF); + CONN1_CONFIG.put(TEST_KEY2, TEST_REF2); + CONN1_CONFIG.put(TEST_KEY3, TEST_REF3); + } + private static final Map<String, String> TASK_CONFIG = new HashMap<>(); + static { + TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName()); + TASK_CONFIG.put(TEST_KEY, TEST_REF); + } + private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>(); + static { + TASK_CONFIGS.add(TASK_CONFIG); + TASK_CONFIGS.add(TASK_CONFIG); + TASK_CONFIGS.add(TASK_CONFIG); + } + private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP = new HashMap<>(); + static { + TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG); + TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG); + TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG); + } + private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, Collections.<String>emptySet()); + private static final ClusterConfigState SNAPSHOT_NO_TASKS = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), + Collections.emptyMap(), Collections.<String>emptySet()); + private final String workerId = "workerId"; private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA"; private final int generation = 5; @@ -248,6 +299,29 @@ public class AbstractHerderTest { verifyAll(); } + @Test + public void testReverseTransformConfigs() throws Exception { + // Construct a task config with constant values for TEST_KEY and TEST_KEY2 + Map<String, String> newTaskConfig = new HashMap<>(); + newTaskConfig.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName()); + newTaskConfig.put(TEST_KEY, TEST_VAL); + newTaskConfig.put(TEST_KEY2, TEST_VAL2); + List<Map<String, String>> newTaskConfigs = new ArrayList<>(); + newTaskConfigs.add(newTaskConfig); + + // The SNAPSHOT has a task config with TEST_KEY and TEST_REF + List<Map<String, String>> reverseTransformed = AbstractHerder.reverseTransform(CONN1, SNAPSHOT, newTaskConfigs); + assertEquals(TEST_REF, reverseTransformed.get(0).get(TEST_KEY)); + + // The SNAPSHOT has no task configs but does have a connector config with TEST_KEY2 and TEST_REF2 + reverseTransformed = AbstractHerder.reverseTransform(CONN1, SNAPSHOT_NO_TASKS, newTaskConfigs); + assertEquals(TEST_REF2, reverseTransformed.get(0).get(TEST_KEY2)); + + // The reverseTransformed result should not have TEST_KEY3 since newTaskConfigs does not have TEST_KEY3 + reverseTransformed = AbstractHerder.reverseTransform(CONN1, SNAPSHOT_NO_TASKS, newTaskConfigs); + assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3)); + } + private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass) { @@ -299,4 +373,11 @@ public class AbstractHerderTest { } } + + // We need to use a real class here due to some issue with mocking java.lang.Class + private abstract class BogusSourceConnector extends SourceConnector { + } + + private abstract class BogusSourceTask extends SourceTask { + } }