This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 7a8ca51 ATLAS-3779: fallback to KafkaClient jaas configiration when
ticket-basedKafkaClient is not specified
7a8ca51 is described below
commit 7a8ca51f83bdd15bf5290f718bd6a79e16eca61b
Author: Jayendra Parab <[email protected]>
AuthorDate: Sat May 30 16:20:59 2020 +0530
ATLAS-3779: fallback to KafkaClient jaas configiration when
ticket-basedKafkaClient is not specified
Signed-off-by: Madhan Neethiraj <[email protected]>
---
.../org/apache/atlas/kafka/KafkaNotification.java | 26 +++++---
.../atlas/kafka/KafkaNotificationMockTest.java | 71 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 8 deletions(-)
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 278b3a7..05fd977 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -29,8 +29,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -431,12 +429,22 @@ public class KafkaNotification extends
AbstractNotification implements Service {
// Required for backward compatability for Hive CLI
if (!isLoginKeytabBased() && isLoginTicketBased()) {
- LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
- jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+ LOG.debug("Checking if ticketBased-KafkaClient is set");
+ // if ticketBased-KafkaClient property is not specified then
use the default client name
+ String ticketBasedConfigPrefix =
JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
+ Configuration ticketBasedConfig =
configuration.subset(ticketBasedConfigPrefix);
+
+ if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+ LOG.debug("ticketBased-KafkaClient JAAS configuration is
set, using it");
+
+ jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+ } else {
+ LOG.info("UserGroupInformation.isLoginTicketBased is true,
but no JAAS configuration found for client {}. Will use JAAS configuration of
client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
+ }
}
- String keyPrefix = jaasClientName + ".";
- String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+ String keyPrefix = jaasClientName + ".";
+ String keyParam = keyPrefix +
JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
String loginModuleName = jaasConfig.getProperty(keyParam);
if (loginModuleName == null) {
@@ -483,7 +491,8 @@ public class KafkaNotification extends AbstractNotification
implements Service {
LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
}
- private static boolean isLoginKeytabBased() {
+ @VisibleForTesting
+ boolean isLoginKeytabBased() {
boolean ret = false;
try {
@@ -495,7 +504,8 @@ public class KafkaNotification extends AbstractNotification
implements Service {
return ret;
}
- private static boolean isLoginTicketBased() {
+ @VisibleForTesting
+ boolean isLoginTicketBased() {
boolean ret = false;
try {
diff --git
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index e345c8b..51c5a0d 100644
---
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.IOException;
@@ -276,6 +277,76 @@ public class KafkaNotificationMockTest {
}
+ @Test
+ public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab",
optionUseKeyTab);
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
+ KafkaNotification spyKafkaNotification =
Mockito.spy(kafkaNotification);
+ when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+ when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+ spyKafkaNotification.setKafkaJAASProperties(configuration,
properties);
+ String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
+ KafkaNotification spyKafkaNotification =
Mockito.spy(kafkaNotification);
+ when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+ when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+ spyKafkaNotification.setKafkaJAASProperties(configuration,
properties);
+ String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
+ }
+ }
+
class TestKafkaNotification extends KafkaNotification {
private final AtlasKafkaConsumer consumer1;