This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new c105bce  KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL 
options (#11486)
c105bce is described below

commit c105bce3dfbd0a19de7a6a1108fd19d1d831db6c
Author: Kirk True <k...@mustardgrain.com>
AuthorDate: Mon Nov 15 15:45:18 2021 -0800

    KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options (#11486)
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 .../apache/kafka/tools/OAuthCompatibilityTool.java | 382 +++++++++++++--------
 1 file changed, 245 insertions(+), 137 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java 
b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
index e173c49..a7edcec 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
@@ -45,6 +45,44 @@ import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CL
 import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC;
 import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
 import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_CIPHER_SUITES_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_CIPHER_SUITES_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_ENABLED_PROTOCOLS_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_KEY_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_KEY_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_DOC;
+import static org.apache.kafka.common.config.SslConfigs.SSL_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_PROTOCOL_DOC;
+import static org.apache.kafka.common.config.SslConfigs.SSL_PROVIDER_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_PROVIDER_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
+import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_DOC;
 import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
 import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_DOC;
 import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
@@ -52,123 +90,47 @@ import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLo
 import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
 import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_DOC;
 
-import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
+import net.sourceforge.argparse4j.inf.Argument;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
 import 
org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetriever;
 import 
org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetrieverFactory;
 import 
org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidator;
 import 
org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidatorFactory;
 import 
org.apache.kafka.common.security.oauthbearer.secured.CloseableVerificationKeyResolver;
-import 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler;
 import 
org.apache.kafka.common.security.oauthbearer.secured.VerificationKeyResolverFactory;
 import org.apache.kafka.common.utils.Exit;
 
 public class OAuthCompatibilityTool {
 
     public static void main(String[] args) {
-        String description = String.format(
-            "This tool is used to verify OAuth/OIDC provider 
compatibility.%n%n" +
-            "To use, first export KAFKA_OPTS with Java system properties that 
match%n" +
-            "your OAuth/OIDC configuration. Next, run the following script 
to%n" +
-            "execute the test:%n%n" +
-            "    ./bin/kafka-run-class.sh %s" +
-            "%n%n" +
-            "Please refer to the following source files for OAuth/OIDC client 
and%n" +
-            "broker configuration options:" +
-            "%n%n" +
-            "    %s%n" +
-            "    %s",
-            OAuthCompatibilityTool.class.getName(),
-            SaslConfigs.class.getName(),
-            OAuthBearerLoginCallbackHandler.class.getName());
-
-        ArgumentParser parser = ArgumentParsers
-            .newArgumentParser("oauth-compatibility-test")
-            .defaultHelp(true)
-            .description(description);
-
-        parser.addArgument("--connect-timeout-ms")
-            .type(Integer.class)
-            .dest("connectTimeoutMs")
-            .help(SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC);
-        parser.addArgument("--read-timeout-ms")
-            .type(Integer.class)
-            .dest("readTimeoutMs")
-            .help(SASL_LOGIN_READ_TIMEOUT_MS_DOC);
-        parser.addArgument("--login-retry-backoff-ms")
-            .type(Long.class)
-            .dest("loginRetryBackoffMs")
-            .help(SASL_LOGIN_RETRY_BACKOFF_MS_DOC);
-        parser.addArgument("--login-retry-backoff-max-ms")
-            .type(Long.class)
-            .dest("loginRetryBackoffMax")
-            .help(SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC);
-        parser.addArgument("--scope-claim-name")
-            .dest("scopeClaimName")
-            .help(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC);
-        parser.addArgument("--sub-claim-name")
-            .dest("subClaimName")
-            .help(SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC);
-        parser.addArgument("--token-endpoint-url")
-            .dest("tokenEndpointUrl")
-            .help(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC);
-        parser.addArgument("--jwks-endpoint-url")
-            .dest("jwksEndpointUrl")
-            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC);
-        parser.addArgument("--jwks-endpoint-refresh-ms")
-            .type(Long.class)
-            .dest("jwksEndpointRefreshMs")
-            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC);
-        parser.addArgument("--jwks-endpoint-retry-backoff-max-ms")
-            .type(Long.class)
-            .dest("jwksEndpointRetryBackoffMaxMs")
-            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC);
-        parser.addArgument("--jwks-endpoint-retry-backoff-ms")
-            .type(Long.class)
-            .dest("jwksEndpointRetryBackoffMs")
-            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC);
-        parser.addArgument("--clock-skew-seconds")
-            .type(Integer.class)
-            .dest("clockSkewSeconds")
-            .help(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC);
-        parser.addArgument("--expected-audience")
-            .dest("expectedAudience")
-            .help(SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC);
-        parser.addArgument("--expected-issuer")
-            .dest("expectedIssuer")
-            .help(SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
-
-        parser.addArgument("--client-id")
-            .dest("clientId")
-            .help(CLIENT_ID_DOC);
-        parser.addArgument("--client-secret")
-            .dest("clientSecret")
-            .help(CLIENT_SECRET_DOC);
-        parser.addArgument("--scope")
-            .dest("scope")
-            .help(SCOPE_DOC);
-
+        ArgsHandler argsHandler = new ArgsHandler();
         Namespace namespace;
 
         try {
-            namespace = parser.parseArgs(args);
+            namespace = argsHandler.parseArgs(args);
         } catch (ArgumentParserException e) {
-            parser.handleError(e);
             Exit.exit(1);
             return;
         }
 
-        Map<String, ?> configs = getConfigs(namespace);
-        Map<String, Object> jaasConfigs = getJaasConfigs(namespace);
+        ConfigHandler configHandler = new ConfigHandler(namespace);
+
+        Map<String, ?> configs = configHandler.getConfigs();
+        Map<String, Object> jaasConfigs = configHandler.getJaasOptions();
 
         try {
             String accessToken;
@@ -208,71 +170,217 @@ public class OAuthCompatibilityTool {
 
             if (t instanceof ConfigException) {
                 System.out.printf("%n");
-                parser.printHelp();
+                argsHandler.parser.printHelp();
             }
 
             Exit.exit(1);
         }
     }
 
-    private static Map<String, ?> getConfigs(Namespace namespace) {
-        Map<String, Object> c = new HashMap<>();
-        maybeAddInt(namespace, "connectTimeoutMs", c, 
SASL_LOGIN_CONNECT_TIMEOUT_MS);
-        maybeAddInt(namespace, "readTimeoutMs", c, SASL_LOGIN_READ_TIMEOUT_MS);
-        maybeAddLong(namespace, "loginRetryBackoffMs", c, 
SASL_LOGIN_RETRY_BACKOFF_MS);
-        maybeAddLong(namespace, "loginRetryBackoffMax", c, 
SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
-        maybeAddString(namespace, "scopeClaimName", c, 
SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
-        maybeAddString(namespace, "subClaimName", c, 
SASL_OAUTHBEARER_SUB_CLAIM_NAME);
-        maybeAddString(namespace, "tokenEndpointUrl", c, 
SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
-        maybeAddString(namespace, "jwksEndpointUrl", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
-        maybeAddLong(namespace, "jwksEndpdointRefreshMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS);
-        maybeAddLong(namespace, "jwksEndpdointRetryBackoffMaxMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS);
-        maybeAddLong(namespace, "jwksEndpdointRetryBackoffMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS);
-        maybeAddInt(namespace, "clockSkewSeconds", c, 
SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS);
-        maybeAddStringList(namespace, "expectedAudience", c, 
SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
-        maybeAddString(namespace, "expectedIssuer", c, 
SASL_OAUTHBEARER_EXPECTED_ISSUER);
-
-        // This here is going to fill in all the defaults for the values we 
don't specify...
-        ConfigDef cd = new ConfigDef();
-        SaslConfigs.addClientSaslSupport(cd);
-        AbstractConfig config = new AbstractConfig(cd, c);
-        return config.values();
-    }
 
-    private static void maybeAddInt(Namespace namespace, String namespaceKey, 
Map<String, Object> configs, String configsKey) {
-        Integer value = namespace.getInt(namespaceKey);
+    private static class ArgsHandler {
 
-        if (value != null)
-            configs.put(configsKey, value);
-    }
+        private static final String DESCRIPTION = String.format(
+            "This tool is used to verify OAuth/OIDC provider 
compatibility.%n%n" +
+            "Run the following script to determine the configuration 
options:%n%n" +
+                "    ./bin/kafka-run-class.sh %s --help",
+            OAuthCompatibilityTool.class.getName());
 
-    private static void maybeAddLong(Namespace namespace, String namespaceKey, 
Map<String, Object> configs, String configsKey) {
-        Long value = namespace.getLong(namespaceKey);
+        private final ArgumentParser parser;
 
-        if (value != null)
-            configs.put(configsKey, value);
-    }
+        private ArgsHandler() {
+            this.parser = ArgumentParsers
+                .newArgumentParser("oauth-compatibility-tool")
+                .defaultHelp(true)
+                .description(DESCRIPTION);
+        }
 
-    private static void maybeAddString(Namespace namespace, String 
namespaceKey, Map<String, Object> configs, String configsKey) {
-        String value = namespace.getString(namespaceKey);
+        private Namespace parseArgs(String[] args) throws 
ArgumentParserException {
+            // SASL/OAuth
+            addArgument(SASL_LOGIN_CONNECT_TIMEOUT_MS, 
SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC, Integer.class);
+            addArgument(SASL_LOGIN_READ_TIMEOUT_MS, 
SASL_LOGIN_READ_TIMEOUT_MS_DOC, Integer.class);
+            addArgument(SASL_LOGIN_RETRY_BACKOFF_MAX_MS, 
SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
+            addArgument(SASL_LOGIN_RETRY_BACKOFF_MS, 
SASL_LOGIN_RETRY_BACKOFF_MS_DOC, Long.class);
+            addArgument(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, 
SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC, Integer.class);
+            addArgument(SASL_OAUTHBEARER_EXPECTED_AUDIENCE, 
SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
+                .action(Arguments.append());
+            addArgument(SASL_OAUTHBEARER_EXPECTED_ISSUER, 
SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
+            addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC, Long.class);
+            addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
+            addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC, Long.class);
+            addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC);
+            addArgument(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, 
SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC);
+            addArgument(SASL_OAUTHBEARER_SUB_CLAIM_NAME, 
SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC);
+            addArgument(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC);
+
+            // SSL
+            addArgument(SSL_CIPHER_SUITES_CONFIG, SSL_CIPHER_SUITES_DOC)
+                .action(Arguments.append());
+            addArgument(SSL_ENABLED_PROTOCOLS_CONFIG, 
SSL_ENABLED_PROTOCOLS_DOC)
+                .action(Arguments.append());
+            addArgument(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC);
+            addArgument(SSL_ENGINE_FACTORY_CLASS_CONFIG, 
SSL_ENGINE_FACTORY_CLASS_DOC);
+            addArgument(SSL_KEYMANAGER_ALGORITHM_CONFIG, 
SSL_KEYMANAGER_ALGORITHM_DOC);
+            addArgument(SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, 
SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC);
+            addArgument(SSL_KEYSTORE_KEY_CONFIG, SSL_KEYSTORE_KEY_DOC);
+            addArgument(SSL_KEYSTORE_LOCATION_CONFIG, 
SSL_KEYSTORE_LOCATION_DOC);
+            addArgument(SSL_KEYSTORE_PASSWORD_CONFIG, 
SSL_KEYSTORE_PASSWORD_DOC);
+            addArgument(SSL_KEYSTORE_TYPE_CONFIG, SSL_KEYSTORE_TYPE_DOC);
+            addArgument(SSL_KEY_PASSWORD_CONFIG, SSL_KEY_PASSWORD_DOC);
+            addArgument(SSL_PROTOCOL_CONFIG, SSL_PROTOCOL_DOC);
+            addArgument(SSL_PROVIDER_CONFIG, SSL_PROVIDER_DOC);
+            addArgument(SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, 
SSL_SECURE_RANDOM_IMPLEMENTATION_DOC);
+            addArgument(SSL_TRUSTMANAGER_ALGORITHM_CONFIG, 
SSL_TRUSTMANAGER_ALGORITHM_DOC);
+            addArgument(SSL_TRUSTSTORE_CERTIFICATES_CONFIG, 
SSL_TRUSTSTORE_CERTIFICATES_DOC);
+            addArgument(SSL_TRUSTSTORE_LOCATION_CONFIG, 
SSL_TRUSTSTORE_LOCATION_DOC);
+            addArgument(SSL_TRUSTSTORE_PASSWORD_CONFIG, 
SSL_TRUSTSTORE_PASSWORD_DOC);
+            addArgument(SSL_TRUSTSTORE_TYPE_CONFIG, SSL_TRUSTSTORE_TYPE_DOC);
+
+            // JAAS options...
+            addArgument(CLIENT_ID_CONFIG, CLIENT_ID_DOC);
+            addArgument(CLIENT_SECRET_CONFIG, CLIENT_SECRET_DOC);
+            addArgument(SCOPE_CONFIG, SCOPE_DOC);
+
+            try {
+                return parser.parseArgs(args);
+            } catch (ArgumentParserException e) {
+                parser.handleError(e);
+                throw e;
+            }
+        }
 
-        if (value != null)
-            configs.put(configsKey, value);
-    }
+        private Argument addArgument(String option, String help) {
+            return addArgument(option, help, String.class);
+        }
+
+        private Argument addArgument(String option, String help, Class<?> 
clazz) {
+            // Change foo.bar into --foo.bar.
+            String name = "--" + option;
 
-    private static void maybeAddStringList(Namespace namespace, String 
namespaceKey, Map<String, Object> configs, String configsKey) {
-        String value = namespace.getString(namespaceKey);
+            return parser.addArgument(name)
+                .type(clazz)
+                .metavar(option)
+                .dest(option)
+                .help(help);
+        }
 
-        if (value != null)
-            configs.put(configsKey, Arrays.asList(value.split(",")));
     }
 
-    private static Map<String, Object> getJaasConfigs(Namespace namespace) {
-        Map<String, Object> c = new HashMap<>();
-        c.put(CLIENT_ID_CONFIG, namespace.getString("clientId"));
-        c.put(CLIENT_SECRET_CONFIG, namespace.getString("clientSecret"));
-        c.put(SCOPE_CONFIG, namespace.getString("scope"));
-        return c;
+    private static class ConfigHandler {
+
+        private final Namespace namespace;
+
+
+        private ConfigHandler(Namespace namespace) {
+            this.namespace = namespace;
+        }
+
+        private Map<String, ?> getConfigs() {
+            Map<String, Object> m = new HashMap<>();
+
+            // SASL/OAuth
+            maybeAddInt(m, SASL_LOGIN_CONNECT_TIMEOUT_MS);
+            maybeAddInt(m, SASL_LOGIN_READ_TIMEOUT_MS);
+            maybeAddLong(m, SASL_LOGIN_RETRY_BACKOFF_MS);
+            maybeAddLong(m, SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
+            maybeAddString(m, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
+            maybeAddString(m, SASL_OAUTHBEARER_SUB_CLAIM_NAME);
+            maybeAddString(m, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
+            maybeAddString(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
+            maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS);
+            maybeAddLong(m, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS);
+            maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS);
+            maybeAddInt(m, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS);
+            maybeAddStringList(m, SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
+            maybeAddString(m, SASL_OAUTHBEARER_EXPECTED_ISSUER);
+
+            // This here is going to fill in all the defaults for the values 
we don't specify...
+            ConfigDef cd = new ConfigDef();
+            SaslConfigs.addClientSaslSupport(cd);
+            SslConfigs.addClientSslSupport(cd);
+            AbstractConfig config = new AbstractConfig(cd, m);
+            return config.values();
+        }
+
+        private Map<String, Object> getJaasOptions() {
+            Map<String, Object> m = new HashMap<>();
+
+            // SASL/OAuth
+            maybeAddString(m, CLIENT_ID_CONFIG);
+            maybeAddString(m, CLIENT_SECRET_CONFIG);
+            maybeAddString(m, SCOPE_CONFIG);
+
+            // SSL
+            maybeAddStringList(m, SSL_CIPHER_SUITES_CONFIG);
+            maybeAddStringList(m, SSL_ENABLED_PROTOCOLS_CONFIG);
+            maybeAddString(m, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+            maybeAddClass(m, SSL_ENGINE_FACTORY_CLASS_CONFIG);
+            maybeAddString(m, SSL_KEYMANAGER_ALGORITHM_CONFIG);
+            maybeAddPassword(m, SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG);
+            maybeAddPassword(m, SSL_KEYSTORE_KEY_CONFIG);
+            maybeAddString(m, SSL_KEYSTORE_LOCATION_CONFIG);
+            maybeAddPassword(m, SSL_KEYSTORE_PASSWORD_CONFIG);
+            maybeAddString(m, SSL_KEYSTORE_TYPE_CONFIG);
+            maybeAddPassword(m, SSL_KEY_PASSWORD_CONFIG);
+            maybeAddString(m, SSL_PROTOCOL_CONFIG);
+            maybeAddString(m, SSL_PROVIDER_CONFIG);
+            maybeAddString(m, SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
+            maybeAddString(m, SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
+            maybeAddPassword(m, SSL_TRUSTSTORE_CERTIFICATES_CONFIG);
+            maybeAddString(m, SSL_TRUSTSTORE_LOCATION_CONFIG);
+            maybeAddPassword(m, SSL_TRUSTSTORE_PASSWORD_CONFIG);
+            maybeAddString(m, SSL_TRUSTSTORE_TYPE_CONFIG);
+
+            return m;
+        }
+
+        private void maybeAddInt(Map<String, Object> m, String option) {
+            Integer value = namespace.getInt(option);
+
+            if (value != null)
+                m.put(option, value);
+        }
+
+        private void maybeAddLong(Map<String, Object> m, String option) {
+            Long value = namespace.getLong(option);
+
+            if (value != null)
+                m.put(option, value);
+        }
+
+        private void maybeAddString(Map<String, Object> m, String option) {
+            String value = namespace.getString(option);
+
+            if (value != null)
+                m.put(option, value);
+        }
+
+        private void maybeAddPassword(Map<String, Object> m, String option) {
+            String value = namespace.getString(option);
+
+            if (value != null)
+                m.put(option, new Password(value));
+        }
+
+        private void maybeAddClass(Map<String, Object> m, String option) {
+            String value = namespace.getString(option);
+
+            if (value != null) {
+                try {
+                    m.put(option, Class.forName(value));
+                } catch (ClassNotFoundException e) {
+                    throw new KafkaException("Could not find class for " + 
option, e);
+                }
+            }
+        }
+
+        private void maybeAddStringList(Map<String, Object> m, String option) {
+            List<String> value = namespace.getList(option);
+
+            if (value != null)
+                m.put(option, value);
+        }
+
     }
 
 }

Reply via email to