This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 4f789be KAFKA-7242: Reverse xform configs before saving (KIP-297)
4f789be is described below
commit 4f789bebf4e8a403b3871f04bc3ba3053440b0be
Author: Robert Yokota <[email protected]>
AuthorDate: Tue Aug 28 12:59:08 2018 -0700
KAFKA-7242: Reverse xform configs before saving (KIP-297)
During actions such as a reconfiguration, the task configs are obtained
via `Worker.connectorTaskConfigs` and then subsequently saved into an
instance of `ClusterConfigState`. The values of the properties that are
saved
are post-transformation (of variable references) when they should be
pre-transformation. This is to avoid secrets appearing in plaintext in
the `connect-configs` topic, for example.
The fix is to change the 2 clients of `Worker.connectorTaskConfigs` to
perform a reverse transformation (values converted back into variable
references) before saving them into an instance of `ClusterConfigState`.
The 2 places where the save is performed are
`DistributedHerder.reconfigureConnector` and
`StandaloneHerder.updateConnectorTasks`.
The way that the reverse transformation works is by using the
"raw" connector config (with variable references still intact) from
`ClusterConfigState` to convert config values back into variable
references for those keys that are common between the task config
and the connector config.
There are 2 additional small changes that only affect `StandaloneHerder`:
1) `ClusterConfigState.allTasksConfigs` has been changed to perform a
transformation (resolution) on all variable references. This is
necessary because the result of this method is compared directly to
`Worker.connectorTaskConfigs`, which also has variable references
resolved.
2) `StandaloneHerder.startConnector` has been changed to match
`DistributedHerder.startConnector`. This is to fix an issue where
during `StandaloneHerder.restartConnector`, the post-transformed
connector config would be saved back into `ClusterConfigState`.
I also performed an analysis of all other code paths where configs are
saved back into `ClusterConfigState` and did not find any other
issues.
Author: Robert Yokota <[email protected]>
Reviewers: Ewen Cheslack-Postava <[email protected]>
Closes #5475 from rayokota/KAFKA-7242-reverse-xform-props
(cherry picked from commit fd5acd73e648a2aab4b970ddf04ad4cace6bad9a)
Signed-off-by: Ewen Cheslack-Postava <[email protected]>
---
.../kafka/common/config/ConfigTransformer.java | 2 +-
.../kafka/connect/runtime/AbstractHerder.java | 43 ++++++++++++
.../runtime/distributed/ClusterConfigState.java | 22 +++++-
.../runtime/distributed/DistributedHerder.java | 5 +-
.../runtime/standalone/StandaloneHerder.java | 14 ++--
.../kafka/connect/runtime/AbstractHerderTest.java | 81 ++++++++++++++++++++++
6 files changed, 154 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index f5a3737..6430ffd 100644
---
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -53,7 +53,7 @@ import java.util.regex.Pattern;
* {@link ConfigProvider#unsubscribe(String, Set, ConfigChangeCallback)}
methods.
*/
public class ConfigTransformer {
- private static final Pattern DEFAULT_PATTERN =
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
+ public static final Pattern DEFAULT_PATTERN =
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
private static final String EMPTY_PATH = "";
private final Map<String, ConfigProvider> configProviders;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index cadb4e0..82fdecc 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -20,9 +20,11 @@ import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -46,6 +48,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -53,6 +56,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* Abstract Herder implementation which handles connector/task lifecycle
tracking. Extensions
@@ -431,4 +436,42 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
return null;
}
}
+
+ /*
+ * Performs a reverse transformation on a set of task configs, by
replacing values with variable references.
+ */
+ public static List<Map<String, String>> reverseTransform(String connName,
+
ClusterConfigState configState,
+ List<Map<String,
String>> configs) {
+
+ // Find the config keys in the raw connector config that have variable
references
+ Map<String, String> rawConnConfig =
configState.rawConnectorConfig(connName);
+ Set<String> connKeysWithVariableValues =
keysWithVariableValues(rawConnConfig, ConfigTransformer.DEFAULT_PATTERN);
+
+ List<Map<String, String>> result = new ArrayList<>();
+ for (Map<String, String> config : configs) {
+ Map<String, String> newConfig = new HashMap<>(config);
+ for (String key : connKeysWithVariableValues) {
+ if (newConfig.containsKey(key)) {
+ newConfig.put(key, rawConnConfig.get(key));
+ }
+ }
+ result.add(newConfig);
+ }
+ return result;
+ }
+
+ private static Set<String> keysWithVariableValues(Map<String, String>
rawConfig, Pattern pattern) {
+ Set<String> keys = new HashSet<>();
+ for (Map.Entry<String, String> config : rawConfig.entrySet()) {
+ if (config.getValue() != null) {
+ Matcher matcher = pattern.matcher(config.getValue());
+ if (matcher.matches()) {
+ keys.add(config.getKey());
+ }
+ }
+ }
+ return keys;
+ }
+
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
index 11693b5..fc6a50d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
@@ -123,6 +123,10 @@ public class ClusterConfigState {
return configs;
}
+ public Map<String, String> rawConnectorConfig(String connector) {
+ return connectorConfigs.get(connector);
+ }
+
/**
* Get the target state of the connector
* @param connector name of the connector
@@ -148,16 +152,28 @@ public class ClusterConfigState {
return configs;
}
+ public Map<String, String> rawTaskConfig(ConnectorTaskId task) {
+ return taskConfigs.get(task);
+ }
+
/**
- * Get all task configs for a connector.
+ * Get all task configs for a connector. The configurations will have
been transformed by
+ * {@link org.apache.kafka.common.config.ConfigTransformer} by having all
variable
+ * references replaced with the current values from external instances of
+ * {@link ConfigProvider}, and may include secrets.
* @param connector name of the connector
* @return a list of task configurations
*/
public List<Map<String, String>> allTaskConfigs(String connector) {
Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>();
for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry :
this.taskConfigs.entrySet()) {
- if (taskConfigEntry.getKey().connector().equals(connector))
- taskConfigs.put(taskConfigEntry.getKey().task(),
taskConfigEntry.getValue());
+ if (taskConfigEntry.getKey().connector().equals(connector)) {
+ Map<String, String> configs = taskConfigEntry.getValue();
+ if (configTransformer != null) {
+ configs = configTransformer.transform(connector, configs);
+ }
+ taskConfigs.put(taskConfigEntry.getKey().task(), configs);
+ }
}
return new LinkedList<>(taskConfigs.values());
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 5efb78a..f2009db 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1020,8 +1020,9 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
}
if (changed) {
+ List<Map<String, String>> rawTaskProps =
reverseTransform(connName, configState, taskProps);
if (isLeader()) {
- configBackingStore.putTaskConfigs(connName, taskProps);
+ configBackingStore.putTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null);
} else {
// We cannot forward the request on the same thread
because this reconfiguration can happen as a result of connector
@@ -1031,7 +1032,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
public void run() {
try {
String reconfigUrl =
RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
- RestClient.httpRequest(reconfigUrl, "POST",
taskProps, null, config);
+ RestClient.httpRequest(reconfigUrl, "POST",
rawTaskProps, null, config);
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure
connector tasks failed", e);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 20c6a24..40ad980 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -201,7 +201,9 @@ public class StandaloneHerder extends AbstractHerder {
created = true;
}
- if (!startConnector(config)) {
+ configBackingStore.putConnectorConfig(connName, config);
+
+ if (!startConnector(connName)) {
callback.onCompletion(new ConnectException("Failed to start
connector: " + connName), null);
return;
}
@@ -270,9 +272,8 @@ public class StandaloneHerder extends AbstractHerder {
if (!configState.contains(connName))
cb.onCompletion(new NotFoundException("Connector " + connName + "
not found", null), null);
- Map<String, String> config = configState.connectorConfig(connName);
worker.stopConnector(connName);
- if (startConnector(config))
+ if (startConnector(connName))
cb.onCompletion(null, null);
else
cb.onCompletion(new ConnectException("Failed to start connector: "
+ connName), null);
@@ -290,9 +291,7 @@ public class StandaloneHerder extends AbstractHerder {
return new StandaloneHerderRequest(requestSeqNum.incrementAndGet(),
future);
}
- private boolean startConnector(Map<String, String> connectorProps) {
- String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
- configBackingStore.putConnectorConfig(connName, connectorProps);
+ private boolean startConnector(String connName) {
Map<String, String> connConfigs =
configState.connectorConfig(connName);
TargetState targetState = configState.targetState(connName);
return worker.startConnector(connName, connConfigs, new
HerderConnectorContext(this, connName), this, targetState);
@@ -336,7 +335,8 @@ public class StandaloneHerder extends AbstractHerder {
if (!newTaskConfigs.equals(oldTaskConfigs)) {
removeConnectorTasks(connName);
- configBackingStore.putTaskConfigs(connName, newTaskConfigs);
+ List<Map<String, String>> rawTaskConfigs =
reverseTransform(connName, configState, newTaskConfigs);
+ configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
createConnectorTasks(connName, configState.targetState(connName));
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index db3cf27..8dbda18 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -20,12 +20,15 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
@@ -40,6 +43,7 @@ import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -61,6 +65,53 @@ import static org.junit.Assert.assertTrue;
@PrepareForTest({AbstractHerder.class})
public class AbstractHerderTest {
+ private static final String CONN1 = "sourceA";
+ private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
+ private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
+ private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
+ private static final Integer MAX_TASKS = 3;
+ private static final Map<String, String> CONN1_CONFIG = new HashMap<>();
+ private static final String TEST_KEY = "testKey";
+ private static final String TEST_KEY2 = "testKey2";
+ private static final String TEST_KEY3 = "testKey3";
+ private static final String TEST_VAL = "testVal";
+ private static final String TEST_VAL2 = "testVal2";
+ private static final String TEST_REF = "${file:/tmp/somefile.txt:somevar}";
+ private static final String TEST_REF2 =
"${file:/tmp/somefile2.txt:somevar2}";
+ private static final String TEST_REF3 =
"${file:/tmp/somefile3.txt:somevar3}";
+ static {
+ CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
+ CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG,
MAX_TASKS.toString());
+ CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
BogusSourceConnector.class.getName());
+ CONN1_CONFIG.put(TEST_KEY, TEST_REF);
+ CONN1_CONFIG.put(TEST_KEY2, TEST_REF2);
+ CONN1_CONFIG.put(TEST_KEY3, TEST_REF3);
+ }
+ private static final Map<String, String> TASK_CONFIG = new HashMap<>();
+ static {
+ TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG,
BogusSourceTask.class.getName());
+ TASK_CONFIG.put(TEST_KEY, TEST_REF);
+ }
+ private static final List<Map<String, String>> TASK_CONFIGS = new
ArrayList<>();
+ static {
+ TASK_CONFIGS.add(TASK_CONFIG);
+ TASK_CONFIGS.add(TASK_CONFIG);
+ TASK_CONFIGS.add(TASK_CONFIG);
+ }
+ private static final HashMap<ConnectorTaskId, Map<String, String>>
TASK_CONFIGS_MAP = new HashMap<>();
+ static {
+ TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
+ TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
+ TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
+ }
+ private static final ClusterConfigState SNAPSHOT = new
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ private static final ClusterConfigState SNAPSHOT_NO_TASKS = new
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ Collections.emptyMap(), Collections.<String>emptySet());
+
private final String workerId = "workerId";
private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
private final int generation = 5;
@@ -248,6 +299,29 @@ public class AbstractHerderTest {
verifyAll();
}
+ @Test
+ public void testReverseTransformConfigs() throws Exception {
+ // Construct a task config with constant values for TEST_KEY and
TEST_KEY2
+ Map<String, String> newTaskConfig = new HashMap<>();
+ newTaskConfig.put(TaskConfig.TASK_CLASS_CONFIG,
BogusSourceTask.class.getName());
+ newTaskConfig.put(TEST_KEY, TEST_VAL);
+ newTaskConfig.put(TEST_KEY2, TEST_VAL2);
+ List<Map<String, String>> newTaskConfigs = new ArrayList<>();
+ newTaskConfigs.add(newTaskConfig);
+
+ // The SNAPSHOT has a task config with TEST_KEY and TEST_REF
+ List<Map<String, String>> reverseTransformed =
AbstractHerder.reverseTransform(CONN1, SNAPSHOT, newTaskConfigs);
+ assertEquals(TEST_REF, reverseTransformed.get(0).get(TEST_KEY));
+
+ // The SNAPSHOT has no task configs but does have a connector config
with TEST_KEY2 and TEST_REF2
+ reverseTransformed = AbstractHerder.reverseTransform(CONN1,
SNAPSHOT_NO_TASKS, newTaskConfigs);
+ assertEquals(TEST_REF2, reverseTransformed.get(0).get(TEST_KEY2));
+
+ // The reverseTransformed result should not have TEST_KEY3 since
newTaskConfigs does not have TEST_KEY3
+ reverseTransformed = AbstractHerder.reverseTransform(CONN1,
SNAPSHOT_NO_TASKS, newTaskConfigs);
+ assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3));
+ }
+
private AbstractHerder createConfigValidationHerder(Class<? extends
Connector> connectorClass) {
@@ -299,4 +373,11 @@ public class AbstractHerderTest {
}
}
+
+ // We need to use a real class here due to some issue with mocking
java.lang.Class
+ private abstract class BogusSourceConnector extends SourceConnector {
+ }
+
+ private abstract class BogusSourceTask extends SourceTask {
+ }
}