SENTRY-1188: Fixes to get kerberos auth work. (Ashish K Singh, Reviewed by: Hao Hao)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/6d79016a Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/6d79016a Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/6d79016a Branch: refs/heads/master Commit: 6d79016aaf2c9179bea9171c393990a466248753 Parents: 70d0ecc Author: hahao <[email protected]> Authored: Fri Apr 15 17:35:19 2016 -0700 Committer: hahao <[email protected]> Committed: Fri Apr 15 20:20:39 2016 -0700 ---------------------------------------------------------------------- .../kafka/authorizer/SentryKafkaAuthorizer.java | 2 +- .../sentry/kafka/binding/KafkaAuthBinding.java | 66 +++++++++++++++++++- .../binding/KafkaAuthBindingSingleton.java | 5 +- .../apache/sentry/kafka/conf/KafkaAuthConf.java | 8 ++- .../policy/kafka/KafkaWildcardPrivilege.java | 2 +- 5 files changed, 77 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java index 3bce6cc..03f7b7f 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java @@ -117,7 +117,7 @@ public class SentryKafkaAuthorizer implements Authorizer { } LOG.info("Configuring Sentry KafkaAuthorizer: " + sentry_site); final KafkaAuthBindingSingleton instance = KafkaAuthBindingSingleton.getInstance(); - instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site); + instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site, configs); this.binding = instance.getAuthBinding(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java index 8f4a8c4..c6600a0 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java @@ -16,6 +16,7 @@ */ package org.apache.sentry.kafka.binding; +import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.HashMap; @@ -34,6 +35,8 @@ import com.google.common.collect.Sets; import kafka.network.RequestChannel; import kafka.security.auth.Operation; import kafka.security.auth.Resource; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.sentry.SentryUserException; @@ -55,6 +58,7 @@ import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericService import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege; import org.apache.sentry.provider.db.generic.service.thrift.TSentryRole; +import org.apache.sentry.service.thrift.ServiceConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; @@ -64,12 +68,16 @@ import scala.collection.Iterator; import scala.collection.JavaConversions; import scala.collection.immutable.Map; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + public class KafkaAuthBinding { private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class); private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA; private static final String COMPONENT_NAME = COMPONENT_TYPE; + private static Boolean kerberosInit; + private final Configuration authConf; private final AuthorizationProvider authProvider; private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance(); @@ -77,12 +85,14 @@ public class KafkaAuthBinding { private ProviderBackend providerBackend; private String instanceName; private String requestorName; + private java.util.Map<String, ?> kafkaConfigs; - public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf) throws Exception { + public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception { this.instanceName = instanceName; this.requestorName = requestorName; this.authConf = authConf; + this.kafkaConfigs = kafkaConfigs; this.authProvider = createAuthProvider(); } @@ -118,6 +128,28 @@ public class KafkaAuthBinding { + providerBackendName); } + // Initiate kerberos via UserGroupInformation if required + if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE)) + && kafkaConfigs != null) { + String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString(); + String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString(); + if (keytabProp != null && principalProp != null) { + String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString(); + if (actualHost != null) { + principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost); + } + initKerberos(keytabProp, principalProp); + } else { + LOG.debug("Could not initialize Kerberos.\n" + + AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" + + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString()); + } + } else { + LOG.debug("Could not initialize Kerberos as no kafka config provided. " + + AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + + " are required configs to be able to initialize Kerberos"); + } + // Instantiate the configured providerBackend Constructor<?> providerBackendConstructor = Class.forName(providerBackendName) @@ -495,4 +527,36 @@ public class KafkaAuthBinding { return principalName; } } + + /** + * Initialize kerberos via UserGroupInformation. Will only attempt to login + * during the first request, subsequent calls will have no effect. + */ + private void initKerberos(String keytabFile, String principal) { + if (keytabFile == null || keytabFile.length() == 0) { + throw new IllegalArgumentException("keytabFile required because kerberos is enabled"); + } + if (principal == null || principal.length() == 0) { + throw new IllegalArgumentException("principal required because kerberos is enabled"); + } + synchronized (KafkaAuthBinding.class) { + if (kerberosInit == null) { + kerberosInit = new Boolean(true); + // let's avoid modifying the supplied configuration, just to be conservative + final Configuration ugiConf = new Configuration(); + ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS); + UserGroupInformation.setConfiguration(ugiConf); + LOG.info( + "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ", + keytabFile, principal); + try { + UserGroupInformation.loginUserFromKeytab(principal, keytabFile); + } catch (IOException ioe) { + throw new RuntimeException("Failed to login user with Principal: " + principal + + " and Keytab file: " + keytabFile, ioe); + } + LOG.info("Got Kerberos ticket"); + } + } + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java index a0007a3..6555dae 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java @@ -18,6 +18,7 @@ package org.apache.sentry.kafka.binding; import java.net.MalformedURLException; import java.net.URL; +import java.util.Map; import org.apache.sentry.kafka.conf.KafkaAuthConf; import org.slf4j.Logger; @@ -56,10 +57,10 @@ public class KafkaAuthBindingSingleton { return kafkaAuthConf; } - public void configure(String instanceName, String requestorName, String sentry_site) { + public void configure(String instanceName, String requestorName, String sentry_site, Map<String, ?> kafkaConfigs) { try { kafkaAuthConf = loadAuthzConf(sentry_site); - binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf); + binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf, kafkaConfigs); log.info("KafkaAuthBinding created successfully"); } catch (Exception ex) { log.error("Unable to create KafkaAuthBinding", ex); http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java index e0d767e..0a57e2e 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java @@ -30,6 +30,9 @@ public class KafkaAuthConf extends Configuration { public static final String KAFKA_SUPER_USERS = "kafka.superusers"; public static final String KAFKA_SERVICE_INSTANCE_NAME = "sentry.kafka.service.instance"; public static final String KAFKA_SERVICE_USER_NAME = "sentry.kafka.service.user.name"; + public static final String KAFKA_PRINCIPAL_HOSTNAME = "sentry.kafka.principal.hostname"; + public static final String KAFKA_PRINCIPAL_NAME = "sentry.kafka.kerberos.principal"; + public static final String KAFKA_KEYTAB_FILE_NAME = "sentry.kafka.keytab.file"; /** * Config setting definitions @@ -40,7 +43,10 @@ public class KafkaAuthConf extends Configuration { AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()), AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()), AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"), - AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"); + AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"), + AUTHZ_PRINCIPAL_HOSTNAME(KAFKA_PRINCIPAL_HOSTNAME, null), + AUTHZ_PRINCIPAL_NAME(KAFKA_PRINCIPAL_NAME, null), + AUTHZ_KEYTAB_FILE_NAME(KAFKA_KEYTAB_FILE_NAME, null); private final String varName; private final String defaultVal; http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java ---------------------------------------------------------------------- diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java index bc299b0..6803a46 100644 --- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java +++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java @@ -121,7 +121,7 @@ public class KafkaWildcardPrivilege implements Privilege { if (KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey())) { // is action return policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) || - policyPart.equals(requestPart); + policyPart.getValue().equalsIgnoreCase(requestPart.getValue()); } else { return policyPart.getValue().equals(requestPart.getValue()); }
