This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b042b36 KAFKA-8426; Fix for keeping the ConfigProvider configs
consistent with KIP-297 (#6750)
b042b36 is described below
commit b042b36674f0bd8046f0229c96e0b8cf99554b2b
Author: tadsul <[email protected]>
AuthorDate: Mon Jun 3 04:56:31 2019 -0700
KAFKA-8426; Fix for keeping the ConfigProvider configs consistent with
KIP-297 (#6750)
According to KIP-297 a parameter is passed to ConfigProvider with syntax
"config.providers.{name}.param.{param-name}". Currently AbstractConfig allows
parameters of the format "config.providers.{name}.{param-name}". With this fix
AbstractConfig will be consistent with KIP-297 syntax.
Reviewers: Robert Yokota <[email protected]>, Rajini Sivaram
<[email protected]>
---
.../apache/kafka/common/config/AbstractConfig.java | 14 +++++++++-
.../kafka/common/config/AbstractConfigTest.java | 32 +++++++++++++++++-----
.../config/provider/MockVaultConfigProvider.java | 18 +++++++++++-
3 files changed, 55 insertions(+), 9 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 13865d0..a48856f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -55,6 +55,8 @@ public class AbstractConfig {
private static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
+ private static final String CONFIG_PROVIDERS_PARAM = ".param.";
+
/**
* Construct a configuration with a ConfigDef and the configuration
properties, which can include properties
* for zero or more {@link ConfigProvider} that will be used to resolve
variables in configuration property
@@ -494,6 +496,16 @@ public class AbstractConfig {
return result;
}
+ /**
+ * Instantiates and configures the ConfigProviders. The config providers
configs are defined as follows:
+ * config.providers : A comma-separated list of names for providers.
+ * config.providers.{name}.class : The Java class name for a provider.
+ * config.providers.{name}.param.{param-name} : A parameter to be passed
to the above Java class on initialization.
+ * returns a map of config provider name and its instance.
+ * @param indirectConfigs The map of potential variable configs
+ * @param providerConfigProperties The map of config provider configs
+ * @return map map of config provider name and its instance.
+ */
private Map<String, ConfigProvider> instantiateConfigProviders(Map<String,
String> indirectConfigs, Map<String, ?> providerConfigProperties) {
final String configProviders =
indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
@@ -513,7 +525,7 @@ public class AbstractConfig {
Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
- String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey()
+ ".";
+ String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey()
+ CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties =
configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(),
ConfigProvider.class);
provider.configure(configProperties);
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 33aaac5..f686475 100644
---
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.metrics.FakeMetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.config.provider.MockVaultConfigProvider;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.junit.Test;
import java.util.Arrays;
@@ -337,7 +339,7 @@ public class AbstractConfigTest {
// Test Case: Valid Test Case for ConfigProviders as part of
config.properties
props.put("config.providers", "file");
- props.put("config.providers.file.class",
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ props.put("config.providers.file.class",
MockFileConfigProvider.class.getName());
props.put("prefix.ssl.truststore.location.number", 5);
props.put("sasl.kerberos.service.name", "service name");
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -354,7 +356,7 @@ public class AbstractConfigTest {
// Test Case: Valid Test Case for ConfigProviders as a separate
variable
Properties providers = new Properties();
providers.put("config.providers", "file");
- providers.put("config.providers.file.class",
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ providers.put("config.providers.file.class",
MockFileConfigProvider.class.getName());
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
@@ -382,8 +384,8 @@ public class AbstractConfigTest {
// Test Case: Valid Test Case With Multiple ConfigProviders as a
separate variable
Properties providers = new Properties();
providers.put("config.providers", "file,vault");
- providers.put("config.providers.file.class",
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
- providers.put("config.providers.vault.class",
"org.apache.kafka.common.config.provider.MockVaultConfigProvider");
+ providers.put("config.providers.file.class",
MockFileConfigProvider.class.getName());
+ providers.put("config.providers.vault.class",
MockVaultConfigProvider.class.getName());
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
@@ -426,7 +428,7 @@ public class AbstractConfigTest {
// Test Case: Config Provider fails to resolve the config (key not
present)
Properties props = new Properties();
props.put("config.providers", "test");
- props.put("config.providers.test.class",
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ props.put("config.providers.test.class",
MockFileConfigProvider.class.getName());
props.put("random", "${test:/foo/bar/testpath:random}");
TestIndirectConfigResolution config = new
TestIndirectConfigResolution(props);
assertEquals(config.originals().get("random"),
"${test:/foo/bar/testpath:random}");
@@ -437,17 +439,33 @@ public class AbstractConfigTest {
// Test Case: If ConfigProvider is provided in both originals and
provider. Only the ones in provider should be used.
Properties providers = new Properties();
providers.put("config.providers", "test");
- providers.put("config.providers.test.class",
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ providers.put("config.providers.test.class",
MockVaultConfigProvider.class.getName());
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("config.providers", "file");
- props.put("config.providers.file.class",
"org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ props.put("config.providers.file.class",
MockVaultConfigProvider.class.getName());
TestIndirectConfigResolution config = new
TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
assertEquals(config.originals().get("sasl.kerberos.key"),
"${file:/usr/kerberos:key}");
}
+ @Test
+ public void testConfigProviderConfigurationWithConfigParams() {
+ // Test Case: Valid Test Case With Multiple ConfigProviders as a
separate variable
+ Properties providers = new Properties();
+ providers.put("config.providers", "vault");
+ providers.put("config.providers.vault.class",
MockVaultConfigProvider.class.getName());
+ providers.put("config.providers.vault.param.key", "randomKey");
+ providers.put("config.providers.vault.param.location", "/usr/vault");
+ Properties props = new Properties();
+ props.put("sasl.truststore.location",
"${vault:/usr/truststore:truststoreKey}");
+ props.put("sasl.truststore.password",
"${vault:/usr/truststore:truststorePassword}");
+ props.put("sasl.truststore.location",
"${vault:/usr/truststore:truststoreLocation}");
+ TestIndirectConfigResolution config = new
TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+ assertEquals(config.originals().get("sasl.truststore.location"),
"/usr/vault");
+ }
+
private static class TestIndirectConfigResolution extends AbstractConfig {
private static final ConfigDef CONFIG;
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
index dbfe158..c741798 100644
---
a/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
+++
b/clients/src/test/java/org/apache/kafka/common/config/provider/MockVaultConfigProvider.java
@@ -19,11 +19,27 @@ package org.apache.kafka.common.config.provider;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
+import java.util.Map;
public class MockVaultConfigProvider extends FileConfigProvider {
+ Map<String, ?> vaultConfigs;
+ private boolean configured = false;
+ private static final String LOCATION = "location";
+
@Override
protected Reader reader(String path) throws IOException {
- return new
StringReader("truststoreKey=testTruststoreKey\ntruststorePassword=randomtruststorePassword");
+ String vaultLocation = (String) vaultConfigs.get(LOCATION);
+ return new
StringReader("truststoreKey=testTruststoreKey\ntruststorePassword=randomtruststorePassword\n"
+ "truststoreLocation=" + vaultLocation + "\n");
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ this.vaultConfigs = configs;
+ configured = true;
+ }
+
+ public boolean configured() {
+ return configured;
}
}