This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 61abeca ATLAS-3779 : Refactoring Kafka in-memory JAASConfig in Atlas.
61abeca is described below
commit 61abecac22ef3e9341a07be6d5354bf246544a3b
Author: Jayendra Parab <[email protected]>
AuthorDate: Mon May 25 14:31:43 2020 +0530
ATLAS-3779 : Refactoring Kafka in-memory JAASConfig in Atlas.
---
.../org/apache/atlas/ApplicationProperties.java | 4 -
.../main/java/org/apache/atlas/hook/AtlasHook.java | 30 -----
.../org/apache/atlas/kafka/KafkaNotification.java | 126 ++++++++++++++++++++
.../atlas/kafka/KafkaNotificationMockTest.java | 132 +++++++++++++++++++++
4 files changed, 258 insertions(+), 34 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
index 3ba5061..e662c8f 100644
--- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -17,7 +17,6 @@
*/
package org.apache.atlas;
-import org.apache.atlas.security.InMemoryJAASConfiguration;
import org.apache.atlas.security.SecurityUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
@@ -109,10 +108,7 @@ public final class ApplicationProperties extends
PropertiesConfiguration {
public static Configuration set(Configuration configuration) throws
AtlasException {
synchronized (ApplicationProperties.class) {
instance = configuration;
-
- InMemoryJAASConfiguration.init(instance);
}
-
return instance;
}
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index cc6546b..8659126 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -93,12 +93,6 @@ public abstract class AtlasHook {
failedMessagesLogger = null;
}
- if (!isLoginKeytabBased()) {
- if (isLoginTicketBased()) {
-
InMemoryJAASConfiguration.setConfigSectionRedirect("KafkaClient",
"ticketBased-KafkaClient");
- }
- }
-
metadataNamespace = getMetadataNamespace(atlasProperties);
notificationMaxRetries =
atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
notificationRetryInterval =
atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
@@ -287,30 +281,6 @@ public abstract class AtlasHook {
}
}
- private static boolean isLoginKeytabBased() {
- boolean ret = false;
-
- try {
- ret = UserGroupInformation.isLoginKeytabBased();
- } catch (Exception excp) {
- LOG.warn("Error in determining keytab for KafkaClient-JAAS
config", excp);
- }
-
- return ret;
- }
-
- private static boolean isLoginTicketBased() {
- boolean ret = false;
-
- try {
- ret = UserGroupInformation.isLoginTicketBased();
- } catch (Exception excp) {
- LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS
config", excp);
- }
-
- return ret;
- }
-
private static String getMetadataNamespace(Configuration config) {
return config.getString(CONF_METADATA_NAMESPACE,
getClusterName(config));
}
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 11a29b9..278b3a7 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -28,6 +28,7 @@ import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -64,6 +65,17 @@ public class KafkaNotification extends AbstractNotification
implements Service {
public static final String ATLAS_ENTITIES_TOPIC =
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
+ static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
+ private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
+ private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM =
"loginModuleName";
+ private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM =
"loginModuleControlFlag";
+ private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG =
"required";
+ private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS =
"optional|requisite|sufficient|required";
+ private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
+ private static final String JAAS_PRINCIPAL_PROP = "principal";
+ private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
+ private static final String JAAS_TICKET_BASED_CLIENT_NAME =
"ticketBased-KafkaClient";
+
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS =
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS =
AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
@@ -134,6 +146,8 @@ public class KafkaNotification extends AbstractNotification
implements Service {
// if no value is specified for max.poll.records, set to 1
properties.put("max.poll.records",
kafkaConf.getInt("max.poll.records", 1));
+ setKafkaJAASProperties(applicationProperties, properties);
+
LOG.info("<== KafkaNotification()");
}
@@ -401,4 +415,116 @@ public class KafkaNotification extends
AbstractNotification implements Service {
return ret;
}
+
+ void setKafkaJAASProperties(Configuration configuration, Properties
kafkaProperties) {
+ LOG.debug("==> KafkaNotification.setKafkaJAASProperties()");
+
+ if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
+ LOG.debug("JAAS config is already set, returning");
+ return;
+ }
+
+ Properties jaasConfig =
ApplicationProperties.getSubsetAsProperties(configuration,
JAAS_CONFIG_PREFIX_PARAM);
+ // JAAS Configuration is present then update set those properties in
sasl.jaas.config
+ if(jaasConfig != null && !jaasConfig.isEmpty()) {
+ String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
+
+ // Required for backward compatability for Hive CLI
+ if (!isLoginKeytabBased() && isLoginTicketBased()) {
+ LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
+ jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+ }
+ String keyPrefix = jaasClientName + ".";
+
+ String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+ String loginModuleName = jaasConfig.getProperty(keyParam);
+
+ if (loginModuleName == null) {
+ LOG.error("Unable to add JAAS configuration for client [{}] as
it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName,
keyParam, jaasClientName);
+ return;
+ }
+
+ keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
+ String controlFlag = jaasConfig.getProperty(keyParam);
+
+ if(StringUtils.isEmpty(controlFlag)) {
+ String validValues =
JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
+ controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
+ LOG.warn("Unknown JAAS configuration value for ({}) = [{}],
valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag,
validValues);
+ }
+ String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX
+ ".";
+ String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
+ int optionPrefixLen = optionPrefix.length();
+ StringBuffer optionStringBuffer = new StringBuffer();
+ for (String key : jaasConfig.stringPropertyNames()) {
+ if (key.startsWith(optionPrefix)) {
+ String optionVal = jaasConfig.getProperty(key);
+ if (optionVal != null) {
+ optionVal = optionVal.trim();
+
+ try {
+ if (key.equalsIgnoreCase(principalOptionKey)) {
+ optionVal =
org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String)
null);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to build serverPrincipal. Using
provided value:[{}]", optionVal);
+ }
+
+ optionVal = surroundWithQuotes(optionVal);
+ optionStringBuffer.append(String.format(" %s=%s",
key.substring(optionPrefixLen), optionVal));
+ }
+ }
+ }
+
+ String newJaasProperty = String.format("%s %s %s ;",
loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
+ kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY,
newJaasProperty);
+ }
+
+ LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
+ }
+
+ private static boolean isLoginKeytabBased() {
+ boolean ret = false;
+
+ try {
+ ret = UserGroupInformation.isLoginKeytabBased();
+ } catch (Exception excp) {
+ LOG.warn("Error in determining keytab for KafkaClient-JAAS
config", excp);
+ }
+
+ return ret;
+ }
+
+ private static boolean isLoginTicketBased() {
+ boolean ret = false;
+
+ try {
+ ret = UserGroupInformation.isLoginTicketBased();
+ } catch (Exception excp) {
+ LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS
config", excp);
+ }
+
+ return ret;
+ }
+
+ private static String surroundWithQuotes(String optionVal) {
+ if(StringUtils.isEmpty(optionVal)) {
+ return optionVal;
+ }
+ String ret = optionVal;
+
+ // For property values which have special chars like "@" or "/", we
need to enclose it in
+ // double quotes, so that Kafka can parse it
+ // If the property is already enclosed in double quotes, then do
nothing.
+ if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length()
- 1) != '"') {
+ // If the string as special characters like except _,-
+ final String SPECIAL_CHAR_LIST = "/!@#%^&*";
+ if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
+ ret = String.format("\"%s\"", optionVal);
+ }
+ }
+
+ return ret;
+ }
+
}
diff --git
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 9b5891f..e345c8b 100644
---
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -17,14 +17,19 @@
*/
package org.apache.atlas.kafka;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
+
+import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,6 +43,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -144,6 +150,132 @@ public class KafkaNotificationMockTest {
}
}
+ @Test
+ public void testSetKafkaJAASPropertiesForAllProperValues() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
+ kafkaNotification.setKafkaJAASProperties(configuration,
properties);
+ String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForMissingControlFlag() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
+ kafkaNotification.setKafkaJAASProperties(configuration,
properties);
+ String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForMissingLoginModuleName() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
+ kafkaNotification.setKafkaJAASProperties(configuration,
properties);
+ String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertNull(newPropertyValue);
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesWithSpecialCharacters() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionKeyTabPath = "/path/to/file.keytab";
+ final String optionPrincipal = "test/[email protected]";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.keyTabPath",
optionKeyTabPath);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.principal",
optionPrincipal);
+
+ try {
+ KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
+ kafkaNotification.setKafkaJAASProperties(configuration,
properties);
+ String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+ String updatedPrincipalValue =
org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionPrincipal,
(String) null);
+
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
+ assertTrue(newPropertyValue.contains("keyTabPath=\"" +
optionKeyTabPath + "\""));
+ assertTrue(newPropertyValue.contains("principal=\""+
updatedPrincipalValue + "\""));
+
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
+ } catch (IOException e) {
+ fail("Failed while getting updated principal value with exception
: " + e.getMessage());
+ }
+
+ }
+
class TestKafkaNotification extends KafkaNotification {
private final AtlasKafkaConsumer consumer1;