This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push: new c105bce KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options (#11486) c105bce is described below commit c105bce3dfbd0a19de7a6a1108fd19d1d831db6c Author: Kirk True <k...@mustardgrain.com> AuthorDate: Mon Nov 15 15:45:18 2021 -0800 KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options (#11486) Reviewers: Jun Rao <jun...@gmail.com> --- .../apache/kafka/tools/OAuthCompatibilityTool.java | 382 +++++++++++++-------- 1 file changed, 245 insertions(+), 137 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java index e173c49..a7edcec 100644 --- a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java @@ -45,6 +45,44 @@ import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CL import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC; import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_CIPHER_SUITES_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_CIPHER_SUITES_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_ENABLED_PROTOCOLS_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_KEY_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_KEY_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_PROTOCOL_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_PROVIDER_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_PROVIDER_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_DOC; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_DOC; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG; @@ -52,123 +90,47 @@ import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLo import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_DOC; -import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.Argument; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetriever; import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetrieverFactory; import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidator; import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidatorFactory; import org.apache.kafka.common.security.oauthbearer.secured.CloseableVerificationKeyResolver; -import org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler; import org.apache.kafka.common.security.oauthbearer.secured.VerificationKeyResolverFactory; import org.apache.kafka.common.utils.Exit; public class OAuthCompatibilityTool { public static void main(String[] args) { - String description = String.format( - "This tool is used to verify OAuth/OIDC provider compatibility.%n%n" + - "To use, first export KAFKA_OPTS with Java system properties that match%n" + - "your OAuth/OIDC configuration. Next, run the following script to%n" + - "execute the test:%n%n" + - " ./bin/kafka-run-class.sh %s" + - "%n%n" + - "Please refer to the following source files for OAuth/OIDC client and%n" + - "broker configuration options:" + - "%n%n" + - " %s%n" + - " %s", - OAuthCompatibilityTool.class.getName(), - SaslConfigs.class.getName(), - OAuthBearerLoginCallbackHandler.class.getName()); - - ArgumentParser parser = ArgumentParsers - .newArgumentParser("oauth-compatibility-test") - .defaultHelp(true) - .description(description); - - parser.addArgument("--connect-timeout-ms") - .type(Integer.class) - .dest("connectTimeoutMs") - .help(SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC); - parser.addArgument("--read-timeout-ms") - .type(Integer.class) - .dest("readTimeoutMs") - .help(SASL_LOGIN_READ_TIMEOUT_MS_DOC); - parser.addArgument("--login-retry-backoff-ms") - .type(Long.class) - .dest("loginRetryBackoffMs") - .help(SASL_LOGIN_RETRY_BACKOFF_MS_DOC); - parser.addArgument("--login-retry-backoff-max-ms") - .type(Long.class) - .dest("loginRetryBackoffMax") - .help(SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC); - parser.addArgument("--scope-claim-name") - .dest("scopeClaimName") - .help(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC); - parser.addArgument("--sub-claim-name") - .dest("subClaimName") - .help(SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC); - parser.addArgument("--token-endpoint-url") - .dest("tokenEndpointUrl") - .help(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC); - parser.addArgument("--jwks-endpoint-url") - .dest("jwksEndpointUrl") - .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC); - parser.addArgument("--jwks-endpoint-refresh-ms") - .type(Long.class) - .dest("jwksEndpointRefreshMs") - .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC); - parser.addArgument("--jwks-endpoint-retry-backoff-max-ms") - .type(Long.class) - .dest("jwksEndpointRetryBackoffMaxMs") - .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC); - parser.addArgument("--jwks-endpoint-retry-backoff-ms") - .type(Long.class) - .dest("jwksEndpointRetryBackoffMs") - .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC); - parser.addArgument("--clock-skew-seconds") - .type(Integer.class) - .dest("clockSkewSeconds") - .help(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC); - parser.addArgument("--expected-audience") - .dest("expectedAudience") - .help(SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC); - parser.addArgument("--expected-issuer") - .dest("expectedIssuer") - .help(SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC); - - parser.addArgument("--client-id") - .dest("clientId") - .help(CLIENT_ID_DOC); - parser.addArgument("--client-secret") - .dest("clientSecret") - .help(CLIENT_SECRET_DOC); - parser.addArgument("--scope") - .dest("scope") - .help(SCOPE_DOC); - + ArgsHandler argsHandler = new ArgsHandler(); Namespace namespace; try { - namespace = parser.parseArgs(args); + namespace = argsHandler.parseArgs(args); } catch (ArgumentParserException e) { - parser.handleError(e); Exit.exit(1); return; } - Map<String, ?> configs = getConfigs(namespace); - Map<String, Object> jaasConfigs = getJaasConfigs(namespace); + ConfigHandler configHandler = new ConfigHandler(namespace); + + Map<String, ?> configs = configHandler.getConfigs(); + Map<String, Object> jaasConfigs = configHandler.getJaasOptions(); try { String accessToken; @@ -208,71 +170,217 @@ public class OAuthCompatibilityTool { if (t instanceof ConfigException) { System.out.printf("%n"); - parser.printHelp(); + argsHandler.parser.printHelp(); } Exit.exit(1); } } - private static Map<String, ?> getConfigs(Namespace namespace) { - Map<String, Object> c = new HashMap<>(); - maybeAddInt(namespace, "connectTimeoutMs", c, SASL_LOGIN_CONNECT_TIMEOUT_MS); - maybeAddInt(namespace, "readTimeoutMs", c, SASL_LOGIN_READ_TIMEOUT_MS); - maybeAddLong(namespace, "loginRetryBackoffMs", c, SASL_LOGIN_RETRY_BACKOFF_MS); - maybeAddLong(namespace, "loginRetryBackoffMax", c, SASL_LOGIN_RETRY_BACKOFF_MAX_MS); - maybeAddString(namespace, "scopeClaimName", c, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME); - maybeAddString(namespace, "subClaimName", c, SASL_OAUTHBEARER_SUB_CLAIM_NAME); - maybeAddString(namespace, "tokenEndpointUrl", c, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL); - maybeAddString(namespace, "jwksEndpointUrl", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL); - maybeAddLong(namespace, "jwksEndpdointRefreshMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS); - maybeAddLong(namespace, "jwksEndpdointRetryBackoffMaxMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS); - maybeAddLong(namespace, "jwksEndpdointRetryBackoffMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS); - maybeAddInt(namespace, "clockSkewSeconds", c, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS); - maybeAddStringList(namespace, "expectedAudience", c, SASL_OAUTHBEARER_EXPECTED_AUDIENCE); - maybeAddString(namespace, "expectedIssuer", c, SASL_OAUTHBEARER_EXPECTED_ISSUER); - - // This here is going to fill in all the defaults for the values we don't specify... - ConfigDef cd = new ConfigDef(); - SaslConfigs.addClientSaslSupport(cd); - AbstractConfig config = new AbstractConfig(cd, c); - return config.values(); - } - private static void maybeAddInt(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) { - Integer value = namespace.getInt(namespaceKey); + private static class ArgsHandler { - if (value != null) - configs.put(configsKey, value); - } + private static final String DESCRIPTION = String.format( + "This tool is used to verify OAuth/OIDC provider compatibility.%n%n" + + "Run the following script to determine the configuration options:%n%n" + + " ./bin/kafka-run-class.sh %s --help", + OAuthCompatibilityTool.class.getName()); - private static void maybeAddLong(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) { - Long value = namespace.getLong(namespaceKey); + private final ArgumentParser parser; - if (value != null) - configs.put(configsKey, value); - } + private ArgsHandler() { + this.parser = ArgumentParsers + .newArgumentParser("oauth-compatibility-tool") + .defaultHelp(true) + .description(DESCRIPTION); + } - private static void maybeAddString(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) { - String value = namespace.getString(namespaceKey); + private Namespace parseArgs(String[] args) throws ArgumentParserException { + // SASL/OAuth + addArgument(SASL_LOGIN_CONNECT_TIMEOUT_MS, SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC, Integer.class); + addArgument(SASL_LOGIN_READ_TIMEOUT_MS, SASL_LOGIN_READ_TIMEOUT_MS_DOC, Integer.class); + addArgument(SASL_LOGIN_RETRY_BACKOFF_MAX_MS, SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC, Long.class); + addArgument(SASL_LOGIN_RETRY_BACKOFF_MS, SASL_LOGIN_RETRY_BACKOFF_MS_DOC, Long.class); + addArgument(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC, Integer.class); + addArgument(SASL_OAUTHBEARER_EXPECTED_AUDIENCE, SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC) + .action(Arguments.append()); + addArgument(SASL_OAUTHBEARER_EXPECTED_ISSUER, SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC); + addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC, Long.class); + addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC, Long.class); + addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC, Long.class); + addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC); + addArgument(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC); + addArgument(SASL_OAUTHBEARER_SUB_CLAIM_NAME, SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC); + addArgument(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC); + + // SSL + addArgument(SSL_CIPHER_SUITES_CONFIG, SSL_CIPHER_SUITES_DOC) + .action(Arguments.append()); + addArgument(SSL_ENABLED_PROTOCOLS_CONFIG, SSL_ENABLED_PROTOCOLS_DOC) + .action(Arguments.append()); + addArgument(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC); + addArgument(SSL_ENGINE_FACTORY_CLASS_CONFIG, SSL_ENGINE_FACTORY_CLASS_DOC); + addArgument(SSL_KEYMANAGER_ALGORITHM_CONFIG, SSL_KEYMANAGER_ALGORITHM_DOC); + addArgument(SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC); + addArgument(SSL_KEYSTORE_KEY_CONFIG, SSL_KEYSTORE_KEY_DOC); + addArgument(SSL_KEYSTORE_LOCATION_CONFIG, SSL_KEYSTORE_LOCATION_DOC); + addArgument(SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_PASSWORD_DOC); + addArgument(SSL_KEYSTORE_TYPE_CONFIG, SSL_KEYSTORE_TYPE_DOC); + addArgument(SSL_KEY_PASSWORD_CONFIG, SSL_KEY_PASSWORD_DOC); + addArgument(SSL_PROTOCOL_CONFIG, SSL_PROTOCOL_DOC); + addArgument(SSL_PROVIDER_CONFIG, SSL_PROVIDER_DOC); + addArgument(SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, SSL_SECURE_RANDOM_IMPLEMENTATION_DOC); + addArgument(SSL_TRUSTMANAGER_ALGORITHM_CONFIG, SSL_TRUSTMANAGER_ALGORITHM_DOC); + addArgument(SSL_TRUSTSTORE_CERTIFICATES_CONFIG, SSL_TRUSTSTORE_CERTIFICATES_DOC); + addArgument(SSL_TRUSTSTORE_LOCATION_CONFIG, SSL_TRUSTSTORE_LOCATION_DOC); + addArgument(SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_PASSWORD_DOC); + addArgument(SSL_TRUSTSTORE_TYPE_CONFIG, SSL_TRUSTSTORE_TYPE_DOC); + + // JAAS options... + addArgument(CLIENT_ID_CONFIG, CLIENT_ID_DOC); + addArgument(CLIENT_SECRET_CONFIG, CLIENT_SECRET_DOC); + addArgument(SCOPE_CONFIG, SCOPE_DOC); + + try { + return parser.parseArgs(args); + } catch (ArgumentParserException e) { + parser.handleError(e); + throw e; + } + } - if (value != null) - configs.put(configsKey, value); - } + private Argument addArgument(String option, String help) { + return addArgument(option, help, String.class); + } + + private Argument addArgument(String option, String help, Class<?> clazz) { + // Change foo.bar into --foo.bar. + String name = "--" + option; - private static void maybeAddStringList(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) { - String value = namespace.getString(namespaceKey); + return parser.addArgument(name) + .type(clazz) + .metavar(option) + .dest(option) + .help(help); + } - if (value != null) - configs.put(configsKey, Arrays.asList(value.split(","))); } - private static Map<String, Object> getJaasConfigs(Namespace namespace) { - Map<String, Object> c = new HashMap<>(); - c.put(CLIENT_ID_CONFIG, namespace.getString("clientId")); - c.put(CLIENT_SECRET_CONFIG, namespace.getString("clientSecret")); - c.put(SCOPE_CONFIG, namespace.getString("scope")); - return c; + private static class ConfigHandler { + + private final Namespace namespace; + + + private ConfigHandler(Namespace namespace) { + this.namespace = namespace; + } + + private Map<String, ?> getConfigs() { + Map<String, Object> m = new HashMap<>(); + + // SASL/OAuth + maybeAddInt(m, SASL_LOGIN_CONNECT_TIMEOUT_MS); + maybeAddInt(m, SASL_LOGIN_READ_TIMEOUT_MS); + maybeAddLong(m, SASL_LOGIN_RETRY_BACKOFF_MS); + maybeAddLong(m, SASL_LOGIN_RETRY_BACKOFF_MAX_MS); + maybeAddString(m, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME); + maybeAddString(m, SASL_OAUTHBEARER_SUB_CLAIM_NAME); + maybeAddString(m, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL); + maybeAddString(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL); + maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS); + maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS); + maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS); + maybeAddInt(m, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS); + maybeAddStringList(m, SASL_OAUTHBEARER_EXPECTED_AUDIENCE); + maybeAddString(m, SASL_OAUTHBEARER_EXPECTED_ISSUER); + + // This here is going to fill in all the defaults for the values we don't specify... + ConfigDef cd = new ConfigDef(); + SaslConfigs.addClientSaslSupport(cd); + SslConfigs.addClientSslSupport(cd); + AbstractConfig config = new AbstractConfig(cd, m); + return config.values(); + } + + private Map<String, Object> getJaasOptions() { + Map<String, Object> m = new HashMap<>(); + + // SASL/OAuth + maybeAddString(m, CLIENT_ID_CONFIG); + maybeAddString(m, CLIENT_SECRET_CONFIG); + maybeAddString(m, SCOPE_CONFIG); + + // SSL + maybeAddStringList(m, SSL_CIPHER_SUITES_CONFIG); + maybeAddStringList(m, SSL_ENABLED_PROTOCOLS_CONFIG); + maybeAddString(m, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); + maybeAddClass(m, SSL_ENGINE_FACTORY_CLASS_CONFIG); + maybeAddString(m, SSL_KEYMANAGER_ALGORITHM_CONFIG); + maybeAddPassword(m, SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG); + maybeAddPassword(m, SSL_KEYSTORE_KEY_CONFIG); + maybeAddString(m, SSL_KEYSTORE_LOCATION_CONFIG); + maybeAddPassword(m, SSL_KEYSTORE_PASSWORD_CONFIG); + maybeAddString(m, SSL_KEYSTORE_TYPE_CONFIG); + maybeAddPassword(m, SSL_KEY_PASSWORD_CONFIG); + maybeAddString(m, SSL_PROTOCOL_CONFIG); + maybeAddString(m, SSL_PROVIDER_CONFIG); + maybeAddString(m, SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG); + maybeAddString(m, SSL_TRUSTMANAGER_ALGORITHM_CONFIG); + maybeAddPassword(m, SSL_TRUSTSTORE_CERTIFICATES_CONFIG); + maybeAddString(m, SSL_TRUSTSTORE_LOCATION_CONFIG); + maybeAddPassword(m, SSL_TRUSTSTORE_PASSWORD_CONFIG); + maybeAddString(m, SSL_TRUSTSTORE_TYPE_CONFIG); + + return m; + } + + private void maybeAddInt(Map<String, Object> m, String option) { + Integer value = namespace.getInt(option); + + if (value != null) + m.put(option, value); + } + + private void maybeAddLong(Map<String, Object> m, String option) { + Long value = namespace.getLong(option); + + if (value != null) + m.put(option, value); + } + + private void maybeAddString(Map<String, Object> m, String option) { + String value = namespace.getString(option); + + if (value != null) + m.put(option, value); + } + + private void maybeAddPassword(Map<String, Object> m, String option) { + String value = namespace.getString(option); + + if (value != null) + m.put(option, new Password(value)); + } + + private void maybeAddClass(Map<String, Object> m, String option) { + String value = namespace.getString(option); + + if (value != null) { + try { + m.put(option, Class.forName(value)); + } catch (ClassNotFoundException e) { + throw new KafkaException("Could not find class for " + option, e); + } + } + } + + private void maybeAddStringList(Map<String, Object> m, String option) { + List<String> value = namespace.getList(option); + + if (value != null) + m.put(option, value); + } + } }