This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 265b58b  KAFKA-5117: Stop resolving externalized configs in Connect 
REST API
265b58b is described below

commit 265b58bd11dfbd8014ccabe320589f7163a82925
Author: Chris Egerton <chr...@confluent.io>
AuthorDate: Wed Jan 23 11:00:23 2019 -0800

    KAFKA-5117: Stop resolving externalized configs in Connect REST API
    
    
[KIP-297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations#KIP-297:ExternalizingSecretsforConnectConfigurations-PublicInterfaces)
 introduced the `ConfigProvider` mechanism, which was primarily intended for 
externalizing secrets provided in connector configurations. However, when 
querying the Connect REST API for the configuration of a connector or its 
tasks, those secrets are still exposed. The changes here prevent the Conne [...]
    
    Tested and verified manually. If these changes are approved unit tests can 
be added to prevent a regression.
    
    Author: Chris Egerton <chr...@confluent.io>
    
    Reviewers: Robert Yokota <rayok...@gmail.com>, Randall Hauch 
<rha...@gmail.com, Ewen Cheslack-Postava <e...@confluent.io>
    
    Closes #6129 from C0urante/hide-provided-connect-configs
    
    (cherry picked from commit 743607af5aa625a19377688709870b021014dee2)
    Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org>
---
 .../runtime/distributed/DistributedHerder.java     |  4 ++--
 .../runtime/standalone/StandaloneHerder.java       |  4 ++--
 .../runtime/distributed/DistributedHerderTest.java | 11 +++++++++-
 .../runtime/standalone/StandaloneHerderTest.java   | 24 +++++++++++++++-------
 tests/kafkatest/tests/connect/connect_rest_test.py |  7 +++++--
 tests/kafkatest/tests/connect/connect_test.py      |  5 ++---
 .../templates/connect-distributed.properties       |  6 ++++++
 7 files changed, 44 insertions(+), 17 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 099f084..7edc3b2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -451,7 +451,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                         if (!configState.contains(connName)) {
                             callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
                         } else {
-                            Map<String, String> config = 
configState.connectorConfig(connName);
+                            Map<String, String> config = 
configState.rawConnectorConfig(connName);
                             callback.onCompletion(null, new 
ConnectorInfo(connName, config,
                                 configState.tasks(connName),
                                 
connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))));
@@ -607,7 +607,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                             List<TaskInfo> result = new ArrayList<>();
                             for (int i = 0; i < 
configState.taskCount(connName); i++) {
                                 ConnectorTaskId id = new 
ConnectorTaskId(connName, i);
-                                result.add(new TaskInfo(id, 
configState.taskConfig(id)));
+                                result.add(new TaskInfo(id, 
configState.rawTaskConfig(id)));
                             }
                             callback.onCompletion(null, result);
                         }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index fe31c28..95b53e5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -134,7 +134,7 @@ public class StandaloneHerder extends AbstractHerder {
     private ConnectorInfo createConnectorInfo(String connector) {
         if (!configState.contains(connector))
             return null;
-        Map<String, String> config = configState.connectorConfig(connector);
+        Map<String, String> config = configState.rawConnectorConfig(connector);
         return new ConnectorInfo(connector, config, 
configState.tasks(connector),
             
connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
     }
@@ -232,7 +232,7 @@ public class StandaloneHerder extends AbstractHerder {
 
         List<TaskInfo> result = new ArrayList<>();
         for (ConnectorTaskId taskId : configState.tasks(connName))
-            result.add(new TaskInfo(taskId, configState.taskConfig(taskId)));
+            result.add(new TaskInfo(taskId, 
configState.rawTaskConfig(taskId)));
         callback.onCompletion(null, result);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index a0de8cf..25c1da8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1293,7 +1293,16 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
-        expectPostRebalanceCatchup(SNAPSHOT);
+
+        WorkerConfigTransformer configTransformer = 
EasyMock.mock(WorkerConfigTransformer.class);
+        EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), 
EasyMock.anyObject()))
+            .andThrow(new AssertionError("Config transformation should not 
occur when requesting connector or task info"));
+        EasyMock.replay(configTransformer);
+        ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, 
Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet(), 
configTransformer);
+
+        expectPostRebalanceCatchup(snapshotWithTransform);
 
 
         member.wakeup();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index b98c15e..a23ee10 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -115,12 +115,14 @@ public class StandaloneHerderTest {
     public void setup() {
         worker = PowerMock.createMock(Worker.class);
         herder = PowerMock.createPartialMock(StandaloneHerder.class, new 
String[]{"connectorTypeForClass"},
-            worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new 
MemoryConfigBackingStore());
+            worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new 
MemoryConfigBackingStore(transformer));
         plugins = PowerMock.createMock(Plugins.class);
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
         PowerMock.mockStatic(Plugins.class);
         PowerMock.mockStatic(WorkerConnector.class);
+        Capture<Map<String, String>> configCapture = Capture.newInstance();
+        EasyMock.expect(transformer.transform(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
     }
 
     @Test
@@ -357,7 +359,8 @@ public class StandaloneHerderTest {
                 Collections.singletonMap(CONNECTOR_NAME, connectorConfig),
                 Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
                 Collections.singletonMap(taskId, 
taskConfig(SourceSink.SOURCE)),
-                new HashSet<>());
+                new HashSet<>(),
+                transformer);
         worker.startTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
 
@@ -390,7 +393,8 @@ public class StandaloneHerderTest {
                 Collections.singletonMap(CONNECTOR_NAME, connectorConfig),
                 Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
                 Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 
0), taskConfig(SourceSink.SOURCE)),
-                new HashSet<>());
+                new HashSet<>(),
+                transformer);
         worker.startTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(false);
 
@@ -458,7 +462,6 @@ public class StandaloneHerderTest {
         // Create connector
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connector, true, connConfig);
 
         // Validate accessors with 1 connector
@@ -485,6 +488,13 @@ public class StandaloneHerderTest {
         herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
 
         herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, 
createCallback);
+
+        EasyMock.reset(transformer);
+        EasyMock.expect(transformer.transform(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.anyObject()))
+            .andThrow(new AssertionError("Config transformation should not 
occur when requesting connector or task info"))
+            .anyTimes();
+        EasyMock.replay(transformer);
+
         herder.connectors(listConnectorsCb);
         herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
         herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
@@ -604,8 +614,7 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
-    private void expectAdd(SourceSink sourceSink) throws Exception {
-
+    private void expectAdd(SourceSink sourceSink) {
         Map<String, String> connectorProps = connectorConfig(sourceSink);
         ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
             new SourceConnectorConfig(plugins, connectorProps) :
@@ -634,7 +643,8 @@ public class StandaloneHerderTest {
                 Collections.singletonMap(CONNECTOR_NAME, 
connectorConfig(sourceSink)),
                 Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
                 Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 
0), generatedTaskProps),
-                new HashSet<>());
+                new HashSet<>(),
+                transformer);
         worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, 
connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
 
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py 
b/tests/kafkatest/tests/connect/connect_rest_test.py
index 8b6157b..c13515b 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.connect import ConnectDistributedService, 
ConnectRestError
+from kafkatest.services.connect import ConnectDistributedService, 
ConnectRestError, ConnectServiceBase
 from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 from ducktape.cluster.remoteaccount import RemoteCommandError
@@ -43,7 +43,9 @@ class ConnectRestApiTest(KafkaTest):
     INPUT_FILE2 = "/mnt/connect.input2"
     OUTPUT_FILE = "/mnt/connect.output"
 
-    TOPIC = "test"
+    TOPIC = "${file:%s:topic.external}" % 
ConnectServiceBase.EXTERNAL_CONFIGS_FILE
+    TOPIC_TEST = "test"
+
     DEFAULT_BATCH_SIZE = "2000"
     OFFSETS_TOPIC = "connect-offsets"
     OFFSETS_REPLICATION_FACTOR = "1"
@@ -78,6 +80,7 @@ class ConnectRestApiTest(KafkaTest):
         self.schemas = True
 
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+        self.cc.set_external_configs(lambda node: 
self.render("connect-file-external.properties", node=node))
 
         self.cc.start()
 
diff --git a/tests/kafkatest/tests/connect/connect_test.py 
b/tests/kafkatest/tests/connect/connect_test.py
index f01ff0a..9a1ff1b 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -22,8 +22,7 @@ from ducktape.errors import TimeoutError
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.connect import ConnectStandaloneService
-from kafkatest.services.connect import ErrorTolerance
+from kafkatest.services.connect import ConnectServiceBase, 
ConnectStandaloneService, ErrorTolerance
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 
@@ -47,7 +46,7 @@ class ConnectStandaloneFileTest(Test):
 
     OFFSETS_FILE = "/mnt/connect.offsets"
 
-    TOPIC = 
"${file:/mnt/connect/connect-external-configs.properties:topic.external}"
+    TOPIC = "${file:%s:topic.external}" % 
ConnectServiceBase.EXTERNAL_CONFIGS_FILE
     TOPIC_TEST = "test"
 
     FIRST_INPUT_LIST = ["foo", "bar", "baz"]
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-distributed.properties 
b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 186773e..ca8c4f8 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -50,3 +50,9 @@ consumer.session.timeout.ms=10000
 
 # Reduce the admin client request timeouts so that we don't wait the default 
120 sec before failing to connect the admin client
 request.timeout.ms=30000
+
+# Allow connector configs to use externalized config values of the form:
+#   ${file:/mnt/connect/connect-external-configs.properties:topic.external}
+#
+config.providers=file
+config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

Reply via email to