This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 09e382b9a0a5f48e3551db69996ad0b1e04cf79e
Author: Jayendra Parab <[email protected]>
AuthorDate: Sat May 30 16:20:59 2020 +0530

    ATLAS-3779: fallback to KafkaClient jaas configiration when 
ticket-basedKafkaClient is not specified
    
    Signed-off-by: Madhan Neethiraj <[email protected]>
    (cherry picked from commit 7a8ca51f83bdd15bf5290f718bd6a79e16eca61b)
---
 .../org/apache/atlas/kafka/KafkaNotification.java  | 26 +++++---
 .../atlas/kafka/KafkaNotificationMockTest.java     | 71 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 8 deletions(-)

diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 278b3a7..05fd977 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -29,8 +29,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -431,12 +429,22 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
 
             // Required for backward compatability for Hive CLI
             if (!isLoginKeytabBased() && isLoginTicketBased()) {
-                LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
-                jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+                LOG.debug("Checking if ticketBased-KafkaClient is set");
+                // if ticketBased-KafkaClient property is not specified then 
use the default client name
+                String        ticketBasedConfigPrefix = 
JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
+                Configuration ticketBasedConfig       = 
configuration.subset(ticketBasedConfigPrefix);
+
+                if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+                    LOG.debug("ticketBased-KafkaClient JAAS configuration is 
set, using it");
+
+                    jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+                } else {
+                    LOG.info("UserGroupInformation.isLoginTicketBased is true, 
but no JAAS configuration found for client {}. Will use JAAS configuration of 
client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
+                }
             }
-            String keyPrefix = jaasClientName + ".";
 
-            String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+            String keyPrefix       = jaasClientName + ".";
+            String keyParam        = keyPrefix + 
JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
             String loginModuleName = jaasConfig.getProperty(keyParam);
 
             if (loginModuleName == null) {
@@ -483,7 +491,8 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
         LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
     }
 
-    private static boolean isLoginKeytabBased() {
+    @VisibleForTesting
+    boolean isLoginKeytabBased() {
         boolean ret = false;
 
         try {
@@ -495,7 +504,8 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
         return ret;
     }
 
-    private static boolean isLoginTicketBased() {
+    @VisibleForTesting
+    boolean isLoginTicketBased() {
         boolean ret = false;
 
         try {
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index e345c8b..51c5a0d 100644
--- 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -276,6 +277,76 @@ public class KafkaNotificationMockTest {
 
     }
 
+    @Test
+    public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = 
"com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
+        
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag",
 loginModuleControlFlag);
+        
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab",
 optionUseKeyTab);
+        
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey", 
optionStoreKey);
+        
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new 
KafkaNotification(configuration);
+            KafkaNotification spyKafkaNotification = 
Mockito.spy(kafkaNotification);
+            when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+            when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+            spyKafkaNotification.setKafkaJAASProperties(configuration, 
properties);
+            String newPropertyValue = 
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), 
"loginModuleName not present in new property");
+            
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
 not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + 
optionUseKeyTab), "useKeyTab not present in new property or value doesn't 
match");
+            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), 
"storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + 
optionServiceName), "serviceName not present in new property or value doesn't 
match");
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with 
exception : " + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = 
"com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", 
loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", 
optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", 
optionStoreKey);
+        
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new 
KafkaNotification(configuration);
+            KafkaNotification spyKafkaNotification = 
Mockito.spy(kafkaNotification);
+            when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+            when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+            spyKafkaNotification.setKafkaJAASProperties(configuration, 
properties);
+            String newPropertyValue = 
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), 
"loginModuleName not present in new property");
+            
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
 not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + 
optionUseKeyTab), "useKeyTab not present in new property or value doesn't 
match");
+            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), 
"storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + 
optionServiceName), "serviceName not present in new property or value doesn't 
match");
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with 
exception : " + e.getMessage());
+        }
+    }
+
     class TestKafkaNotification extends KafkaNotification {
 
         private final AtlasKafkaConsumer consumer1;

Reply via email to