This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36a8fec KAFKA-7225: Pretransform validated props
36a8fec is described below
commit 36a8fec0ab2d05a8386ecd386bbbd294c3dc9126
Author: Robert Yokota <[email protected]>
AuthorDate: Tue Aug 7 13:18:16 2018 -0700
KAFKA-7225: Pretransform validated props
If a property requires validation, it should be pretransformed if it is a
variable reference, in order to have a value that will properly pass the
validation.
Author: Robert Yokota <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava
<[email protected]>
Closes #5445 from rayokota/KAFKA-7225-pretransform-validated-props
---
.../apache/kafka/connect/runtime/AbstractHerder.java | 3 +++
.../connect/runtime/WorkerConfigTransformer.java | 8 +++++++-
.../kafka/connect/runtime/AbstractHerderTest.java | 4 ++++
.../runtime/distributed/DistributedHerderTest.java | 20 ++++++++++++++++++++
.../runtime/standalone/StandaloneHerderTest.java | 17 +++++++++++++++++
tests/kafkatest/tests/connect/connect_test.py | 11 +++++++++--
.../templates/connect-file-external.properties | 16 ++++++++++++++++
.../connect/templates/connect-standalone.properties | 3 +++
8 files changed, 79 insertions(+), 3 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index b5e0ec2..cadb4e0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -246,6 +246,9 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
@Override
public ConfigInfos validateConnectorConfig(Map<String, String>
connectorProps) {
+ if (worker.configTransformer() != null) {
+ connectorProps =
worker.configTransformer().transform(connectorProps);
+ }
String connType =
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
if (connType == null)
throw new BadRequestException("Connector config " + connectorProps
+ " contains no connector type");
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 7efb481..1b715c7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -38,10 +38,16 @@ public class WorkerConfigTransformer {
this.configTransformer = new ConfigTransformer(configProviders);
}
+ public Map<String, String> transform(Map<String, String> configs) {
+ return transform(null, configs);
+ }
+
public Map<String, String> transform(String connectorName, Map<String,
String> configs) {
if (configs == null) return null;
ConfigTransformerResult result = configTransformer.transform(configs);
- scheduleReload(connectorName, result.ttls());
+ if (connectorName != null) {
+ scheduleReload(connectorName, result.ttls());
+ }
return result.data();
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 5728465..db3cf27 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -67,6 +67,7 @@ public class AbstractHerderTest {
private final String connector = "connector";
@MockStrict private Worker worker;
+ @MockStrict private WorkerConfigTransformer transformer;
@MockStrict private Plugins plugins;
@MockStrict private ClassLoader classLoader;
@MockStrict private ConfigBackingStore configStore;
@@ -261,6 +262,9 @@ public class AbstractHerderTest {
EasyMock.expect(herder.generation()).andStubReturn(generation);
// Call to validateConnectorConfig
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
final Connector connector;
try {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 911afe7..a0de8cf 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import
org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -158,6 +159,7 @@ public class DistributedHerderTest {
private DistributedHerder herder;
private MockConnectMetrics metrics;
@Mock private Worker worker;
+ @Mock private WorkerConfigTransformer transformer;
@Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
@Mock
private Plugins plugins;
@@ -356,6 +358,9 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -399,6 +404,9 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -444,6 +452,9 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -495,6 +506,9 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -530,6 +544,9 @@ public class DistributedHerderTest {
@Test
public void testCreateConnectorAlreadyExists() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null);
expectRebalance(1, Collections.<String>emptyList(),
Collections.<ConnectorTaskId>emptyList());
@@ -1339,6 +1356,9 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 5372a3a..b98c15e 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerConnector;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
@@ -101,6 +102,7 @@ public class StandaloneHerderTest {
private Connector connector;
@Mock protected Worker worker;
+ @Mock protected WorkerConfigTransformer transformer;
@Mock private Plugins plugins;
@Mock
private PluginClassLoader pluginLoader;
@@ -146,6 +148,9 @@ public class StandaloneHerderTest {
config.remove(ConnectorConfig.NAME_CONFIG);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -171,6 +176,9 @@ public class StandaloneHerderTest {
connector = PowerMock.createMock(BogusSourceConnector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -205,6 +213,9 @@ public class StandaloneHerderTest {
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config, config);
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
// No new connector is created
@@ -565,6 +576,9 @@ public class StandaloneHerderTest {
);
ConfigDef configDef = new ConfigDef();
configDef.define(key, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "");
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
@@ -672,6 +686,9 @@ public class StandaloneHerderTest {
Map<String, String>... configs
) {
// config validation
+
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
+ final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
+
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
if (shouldCreateConnector) {
diff --git a/tests/kafkatest/tests/connect/connect_test.py
b/tests/kafkatest/tests/connect/connect_test.py
index 3753876..9d34c48 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -27,6 +27,7 @@ from kafkatest.services.security.security_config import
SecurityConfig
import hashlib
import json
+import os.path
class ConnectStandaloneFileTest(Test):
@@ -44,7 +45,8 @@ class ConnectStandaloneFileTest(Test):
OFFSETS_FILE = "/mnt/connect.offsets"
- TOPIC = "test"
+ TOPIC =
"${file:/mnt/connect/connect-file-external.properties:topic.external}"
+ TOPIC_TEST = "test"
FIRST_INPUT_LIST = ["foo", "bar", "baz"]
FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
@@ -90,13 +92,18 @@ class ConnectStandaloneFileTest(Test):
self.source = ConnectStandaloneService(self.test_context, self.kafka,
[self.INPUT_FILE, self.OFFSETS_FILE])
self.sink = ConnectStandaloneService(self.test_context, self.kafka,
[self.OUTPUT_FILE, self.OFFSETS_FILE])
- self.consumer_validator = ConsoleConsumer(self.test_context, 1,
self.kafka, self.TOPIC,
+ self.consumer_validator = ConsoleConsumer(self.test_context, 1,
self.kafka, self.TOPIC_TEST,
consumer_timeout_ms=10000)
self.zk.start()
self.kafka.start()
+ source_external_props = os.path.join(self.source.PERSISTENT_ROOT,
"connect-file-external.properties")
+ self.source.node.account.create_file(source_external_props,
self.render('connect-file-external.properties'))
self.source.set_configs(lambda node:
self.render("connect-standalone.properties", node=node),
[self.render("connect-file-source.properties")])
+
+ sink_external_props = os.path.join(self.sink.PERSISTENT_ROOT,
"connect-file-external.properties")
+ self.sink.node.account.create_file(sink_external_props,
self.render('connect-file-external.properties'))
self.sink.set_configs(lambda node:
self.render("connect-standalone.properties", node=node),
[self.render("connect-file-sink.properties")])
self.source.start()
diff --git
a/tests/kafkatest/tests/connect/templates/connect-file-external.properties
b/tests/kafkatest/tests/connect/templates/connect-file-external.properties
new file mode 100644
index 0000000..8dccd25
--- /dev/null
+++ b/tests/kafkatest/tests/connect/templates/connect-file-external.properties
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+topic.external={{ TOPIC_TEST }}
diff --git
a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index a8eaa44..cbfe491 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -31,3 +31,6 @@ offset.storage.file.filename={{ OFFSETS_FILE }}
# Reduce the admin client request timeouts so that we don't wait the default
120 sec before failing to connect the admin client
request.timeout.ms=30000
+
+config.providers=file
+config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider