This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b042b36  KAFKA-8426; Fix for keeping the ConfigProvider configs 
consistent with KIP-297 (#6750)
b042b36 is described below

commit b042b36674f0bd8046f0229c96e0b8cf99554b2b
Author: tadsul <[email protected]>
AuthorDate: Mon Jun 3 04:56:31 2019 -0700

    KAFKA-8426; Fix for keeping the ConfigProvider configs consistent with 
KIP-297 (#6750)
    
    According to KIP-297 a parameter is passed to ConfigProvider with syntax 
"config.providers.{name}.param.{param-name}". Currently AbstractConfig allows 
parameters of the format "config.providers.{name}.{param-name}". With this fix 
AbstractConfig will be consistent with KIP-297 syntax.
    
    Reviewers: Robert Yokota <[email protected]>, Rajini Sivaram 
<[email protected]>
---
 .../apache/kafka/common/config/AbstractConfig.java | 14 +++++++++-
 .../kafka/common/config/AbstractConfigTest.java    | 32 +++++++++++++++++-----
 .../config/provider/MockVaultConfigProvider.java   | 18 +++++++++++-
 3 files changed, 55 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 13865d0..a48856f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -55,6 +55,8 @@ public class AbstractConfig {
 
     private static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
 
+    private static final String CONFIG_PROVIDERS_PARAM = ".param.";
+
     /**
      * Construct a configuration with a ConfigDef and the configuration 
properties, which can include properties
      * for zero or more {@link ConfigProvider} that will be used to resolve 
variables in configuration property
@@ -494,6 +496,16 @@ public class AbstractConfig {
         return result;
     }
 
+    /**
+     * Instantiates and configures the ConfigProviders. The config providers 
configs are defined as follows:
+     * config.providers : A comma-separated list of names for providers.
+     * config.providers.{name}.class : The Java class name for a provider.
+     * config.providers.{name}.param.{param-name} : A parameter to be passed 
to the above Java class on initialization.
+     * returns a map of config provider name and its instance.
+     * @param indirectConfigs The map of potential variable configs
+     * @param providerConfigProperties The map of config provider configs
+     * @return map map of config provider name and its instance.
+     */
     private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, 
String> indirectConfigs, Map<String, ?> providerConfigProperties) {
         final String configProviders = 
indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
 
@@ -513,7 +525,7 @@ public class AbstractConfig {
         Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
-                String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() 
+ ".";
+                String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() 
+ CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = 
configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), 
ConfigProvider.class);
                 provider.configure(configProperties);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 33aaac5..f686475 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.metrics.FakeMetricsReporter;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.config.provider.MockVaultConfigProvider;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -337,7 +339,7 @@ public class AbstractConfigTest {
 
         // Test Case: Valid Test Case for ConfigProviders as part of 
config.properties
         props.put("config.providers", "file");
-        props.put("config.providers.file.class", 
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
         props.put("prefix.ssl.truststore.location.number", 5);
         props.put("sasl.kerberos.service.name", "service name");
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -354,7 +356,7 @@ public class AbstractConfigTest {
         // Test Case: Valid Test Case for ConfigProviders as a separate 
variable
         Properties providers = new Properties();
         providers.put("config.providers", "file");
-        providers.put("config.providers.file.class", 
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        providers.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
@@ -382,8 +384,8 @@ public class AbstractConfigTest {
         // Test Case: Valid Test Case With Multiple ConfigProviders as a 
separate variable
         Properties providers = new Properties();
         providers.put("config.providers", "file,vault");
-        providers.put("config.providers.file.class", 
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
-        providers.put("config.providers.vault.class", 
"org.apache.kafka.common.config.provider.MockVaultConfigProvider");
+        providers.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+        providers.put("config.providers.vault.class", 
MockVaultConfigProvider.class.getName());
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
@@ -426,7 +428,7 @@ public class AbstractConfigTest {
         // Test Case: Config Provider fails to resolve the config (key not 
present)
         Properties props = new Properties();
         props.put("config.providers", "test");
-        props.put("config.providers.test.class", 
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        props.put("config.providers.test.class", 
MockFileConfigProvider.class.getName());
         props.put("random", "${test:/foo/bar/testpath:random}");
         TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props);
         assertEquals(config.originals().get("random"), 
"${test:/foo/bar/testpath:random}");
@@ -437,17 +439,33 @@ public class AbstractConfigTest {
         // Test Case: If ConfigProvider is provided in both originals and 
provider. Only the ones in provider should be used.
         Properties providers = new Properties();
         providers.put("config.providers", "test");
-        providers.put("config.providers.test.class", 
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        providers.put("config.providers.test.class", 
MockVaultConfigProvider.class.getName());
 
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("config.providers", "file");
-        props.put("config.providers.file.class", 
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        props.put("config.providers.file.class", 
MockVaultConfigProvider.class.getName());
 
         TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
         assertEquals(config.originals().get("sasl.kerberos.key"), 
"${file:/usr/kerberos:key}");
     }
 
+    @Test
+    public void testConfigProviderConfigurationWithConfigParams() {
+        // Test Case: Valid Test Case With Multiple ConfigProviders as a 
separate variable
+        Properties providers = new Properties();
+        providers.put("config.providers", "vault");
+        providers.put("config.providers.vault.class", 
MockVaultConfigProvider.class.getName());
+        providers.put("config.providers.vault.param.key", "randomKey");
+        providers.put("config.providers.vault.param.location", "/usr/vault");
+        Properties props = new Properties();
+        props.put("sasl.truststore.location", 
"${vault:/usr/truststore:truststoreKey}");
+        props.put("sasl.truststore.password", 
"${vault:/usr/truststore:truststorePassword}");
+        props.put("sasl.truststore.location", 
"${vault:/usr/truststore:truststoreLocation}");
+        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        assertEquals(config.originals().get("sasl.truststore.location"), 
"/usr/vault");
+    }
+
     private static class TestIndirectConfigResolution extends AbstractConfig {
 
         private static final ConfigDef CONFIG;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
 
b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
index dbfe158..c741798 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
@@ -19,11 +19,27 @@ package org.apache.kafka.common.config.provider;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
+import java.util.Map;
 
 public class MockVaultConfigProvider extends FileConfigProvider {
 
+    Map<String, ?> vaultConfigs;
+    private boolean configured = false;
+    private static final String LOCATION = "location";
+
     @Override
     protected Reader reader(String path) throws IOException {
-        return new 
StringReader("truststoreKey=testTruststoreKey\ntruststorePassword=randomtruststorePassword");
+        String vaultLocation = (String) vaultConfigs.get(LOCATION);
+        return new 
StringReader("truststoreKey=testTruststoreKey\ntruststorePassword=randomtruststorePassword\n"
 + "truststoreLocation=" + vaultLocation + "\n");
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        this.vaultConfigs = configs;
+        configured = true;
+    }
+
+    public boolean configured() {
+        return configured;
     }
 }

Reply via email to