This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 09e382b9a0a5f48e3551db69996ad0b1e04cf79e Author: Jayendra Parab <[email protected]> AuthorDate: Sat May 30 16:20:59 2020 +0530 ATLAS-3779: fallback to KafkaClient jaas configiration when ticket-basedKafkaClient is not specified Signed-off-by: Madhan Neethiraj <[email protected]> (cherry picked from commit 7a8ca51f83bdd15bf5290f718bd6a79e16eca61b) --- .../org/apache/atlas/kafka/KafkaNotification.java | 26 +++++--- .../atlas/kafka/KafkaNotificationMockTest.java | 71 ++++++++++++++++++++++ 2 files changed, 89 insertions(+), 8 deletions(-) diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 278b3a7..05fd977 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -29,8 +29,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationConverter; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.alias.CredentialProvider; -import org.apache.hadoop.security.alias.CredentialProviderFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -431,12 +429,22 @@ public class KafkaNotification extends AbstractNotification implements Service { // Required for backward compatability for Hive CLI if (!isLoginKeytabBased() && isLoginTicketBased()) { - LOG.debug("Using ticketBased-KafkaClient JAAS configuration"); - jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME; + LOG.debug("Checking if ticketBased-KafkaClient is set"); + // if ticketBased-KafkaClient property is not specified then use the default client name + String ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME; + Configuration ticketBasedConfig = configuration.subset(ticketBasedConfigPrefix); + + if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) { + LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it"); + + jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME; + } else { + LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName); + } } - String keyPrefix = jaasClientName + "."; - String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM; + String keyPrefix = jaasClientName + "."; + String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM; String loginModuleName = jaasConfig.getProperty(keyParam); if (loginModuleName == null) { @@ -483,7 +491,8 @@ public class KafkaNotification extends AbstractNotification implements Service { LOG.debug("<== KafkaNotification.setKafkaJAASProperties()"); } - private static boolean isLoginKeytabBased() { + @VisibleForTesting + boolean isLoginKeytabBased() { boolean ret = false; try { @@ -495,7 +504,8 @@ public class KafkaNotification extends AbstractNotification implements Service { return ret; } - private static boolean isLoginTicketBased() { + @VisibleForTesting + boolean isLoginTicketBased() { boolean ret = false; try { diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java index e345c8b..51c5a0d 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.mockito.Mockito; import org.testng.annotations.Test; import java.io.IOException; @@ -276,6 +277,76 @@ public class KafkaNotificationMockTest { } + @Test + public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() { + Properties properties = new Properties(); + Configuration configuration = new PropertiesConfiguration(); + + final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule"; + final String loginModuleControlFlag = "required"; + final String optionUseKeyTab = "false"; + final String optionStoreKey = "true"; + final String optionServiceName = "kafka"; + + configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName); + configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", loginModuleControlFlag); + configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab", optionUseKeyTab); + configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey", optionStoreKey); + configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName); + + try { + KafkaNotification kafkaNotification = new KafkaNotification(configuration); + KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification); + when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false); + when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true); + spyKafkaNotification.setKafkaJAASProperties(configuration, properties); + String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY); + + assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); + assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property"); + assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match"); + } catch (AtlasException e) { + fail("Failed while creating KafkaNotification object with exception : " + e.getMessage()); + } + } + + @Test + public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() { + Properties properties = new Properties(); + Configuration configuration = new PropertiesConfiguration(); + + final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule"; + final String loginModuleControlFlag = "required"; + final String optionUseKeyTab = "false"; + final String optionStoreKey = "true"; + final String optionServiceName = "kafka"; + + configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName); + configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag); + configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab); + configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey); + configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName); + + try { + KafkaNotification kafkaNotification = new KafkaNotification(configuration); + KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification); + when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false); + when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true); + spyKafkaNotification.setKafkaJAASProperties(configuration, properties); + String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY); + + assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); + assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property"); + assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match"); + } catch (AtlasException e) { + fail("Failed while creating KafkaNotification object with exception : " + e.getMessage()); + } + } + class TestKafkaNotification extends KafkaNotification { private final AtlasKafkaConsumer consumer1;
