This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new d4b15c7 ATLAS-3864 : Break the dependency between Atlas and Kafka's
Zookeeper
d4b15c7 is described below
commit d4b15c7a2ab47d64a49070fab766a95497db4f0f
Author: Jayendra Parab <[email protected]>
AuthorDate: Tue Sep 22 11:58:10 2020 +0530
ATLAS-3864 : Break the dependency between Atlas and Kafka's Zookeeper
Signed-off-by: nixonrodrigues <[email protected]>
---
.../atlas/hive/bridge/HiveMetaStoreBridgeTest.java | 2 +-
.../org/apache/atlas/kafka/bridge/KafkaBridge.java | 62 ++---
common/pom.xml | 15 +-
.../java/org/apache/atlas/utils/KafkaUtils.java | 279 +++++++++++++++++++++
.../org/apache/atlas/utils/KafkaUtilsTest.java | 215 ++++++++++++++++
.../org/apache/atlas/hook/AtlasTopicCreator.java | 53 +---
.../org/apache/atlas/kafka/KafkaNotification.java | 139 +---------
.../apache/atlas/hook/AtlasTopicCreatorTest.java | 245 +++---------------
.../atlas/kafka/KafkaNotificationMockTest.java | 196 ---------------
.../store/graph/v2/BulkImportPercentTest.java | 2 +-
.../notification/NotificationHookConsumerTest.java | 2 +-
.../atlas/web/service/CuratorFactoryTest.java | 3 +-
12 files changed, 587 insertions(+), 626 deletions(-)
diff --git
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index af61ade..ae7ab1a 100644
---
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -310,7 +310,7 @@ public class HiveMetaStoreBridgeTest {
return table;
}
- private class MatchesReferenceableProperty extends ArgumentMatcher<Object>
{
+ private class MatchesReferenceableProperty implements
ArgumentMatcher<Object> {
private final String attrName;
private final Object attrValue;
diff --git
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 40b1fee..a13b029 100644
---
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -18,12 +18,7 @@
package org.apache.atlas.kafka.bridge;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.kafka.common.security.JaasUtils;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.kafka.model.KafkaDataTypes;
@@ -32,6 +27,7 @@ import
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.atlas.utils.KafkaUtils;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -53,6 +49,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
public class KafkaBridge {
@@ -74,22 +71,17 @@ public class KafkaBridge {
private static final String TOPIC = "topic";
private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME =
"%s@%s";
- private static final String ZOOKEEPER_CONNECT =
"atlas.kafka.zookeeper.connect";
- private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS =
"atlas.kafka.zookeeper.connection.timeout.ms";
- private static final String ZOOKEEPER_SESSION_TIMEOUT_MS =
"atlas.kafka.zookeeper.session.timeout.ms";
- private static final String DEFAULT_ZOOKEEPER_CONNECT =
"localhost:2181";
- private static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10 *
1000;
- private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 *
1000;
private final List<String> availableTopics;
private final String metadataNamespace;
private final AtlasClientV2 atlasClientV2;
- private final ZkUtils zkUtils;
+ private final KafkaUtils kafkaUtils;
public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null;
+ KafkaBridge importer = null;
try {
Options options = new Options();
@@ -118,7 +110,7 @@ public class KafkaBridge {
atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(),
urls);
}
- KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2);
+ importer = new KafkaBridge(atlasConf, atlasClientV2);
if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
@@ -153,21 +145,25 @@ public class KafkaBridge {
if (atlasClientV2 != null) {
atlasClientV2.close();
}
+ if (importer != null) {
+ importer.close();
+ }
}
System.exit(exitCode);
}
public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2)
throws Exception {
- String zookeeperConnect = getZKConnection(atlasConf);
- int sessionTimeOutMs =
atlasConf.getInt(ZOOKEEPER_SESSION_TIMEOUT_MS,
DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS) ;
- int connectionTimeOutMs =
atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS,
DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
- ZkClient zkClient = new ZkClient(zookeeperConnect,
sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$);
-
this.atlasClientV2 = atlasClientV2;
this.metadataNamespace = getMetadataNamespace(atlasConf);
- this.zkUtils = new ZkUtils(zkClient, new
ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
- this.availableTopics =
scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
+ this.kafkaUtils = new KafkaUtils(atlasConf);
+ this.availableTopics = kafkaUtils.listAllTopics();
+ }
+
+ public void close() {
+ if (this.kafkaUtils != null) {
+ this.kafkaUtils.close();
+ }
}
private String getMetadataNamespace(Configuration config) {
@@ -225,7 +221,7 @@ public class KafkaBridge {
}
@VisibleForTesting
- AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) {
+ AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) throws
Exception {
final AtlasEntity ret;
if (topicEntity == null) {
@@ -242,7 +238,12 @@ public class KafkaBridge {
ret.setAttribute(NAME,topic);
ret.setAttribute(DESCRIPTION_ATTR, topic);
ret.setAttribute(URI, topic);
- ret.setAttribute(PARTITION_COUNT, (Integer)
zkUtils.getTopicPartitionCount(topic).get());
+ try {
+ ret.setAttribute(PARTITION_COUNT,
kafkaUtils.getPartitionCount(topic));
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Error while getting partition count for topic :" +
topic, e);
+ throw new Exception("Error while getting partition count for topic
:" + topic, e);
+ }
return ret;
}
@@ -351,21 +352,4 @@ public class KafkaBridge {
entity.getRelationshipAttributes().clear();
}
}
-
- private String getStringValue(String[] vals) {
- String ret = null;
- for(String val:vals) {
- ret = (ret == null) ? val : ret + "," + val;
- }
- return ret;
- }
-
- private String getZKConnection(Configuration atlasConf) {
- String ret = null;
- ret = getStringValue(atlasConf.getStringArray(ZOOKEEPER_CONNECT));
- if (StringUtils.isEmpty(ret) ) {
- ret = DEFAULT_ZOOKEEPER_CONNECT;
- }
- return ret;
- }
}
diff --git a/common/pom.xml b/common/pom.xml
index 5be9557..616f66c 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -81,7 +81,14 @@
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <artifactId>mockito-core</artifactId>
+ <version>3.5.10</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>3.5.10</version>
</dependency>
<dependency>
@@ -115,6 +122,12 @@
<version>1.3.2</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
</dependencies>
<build>
diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
new file mode 100644
index 0000000..7a397b1
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class KafkaUtils implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaUtils.class);
+
+ static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
+ private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
+ private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM =
"loginModuleName";
+ private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM =
"loginModuleControlFlag";
+ private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG =
"required";
+ private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS =
"optional|requisite|sufficient|required";
+ private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
+ private static final String JAAS_PRINCIPAL_PROP = "principal";
+ private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
+ private static final String JAAS_TICKET_BASED_CLIENT_NAME =
"ticketBased-KafkaClient";
+
+ public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
+
+ final protected Properties kafkaConfiguration;
+
+ final protected AdminClient adminClient;
+
+ public KafkaUtils(Configuration atlasConfiguration) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> KafkaUtils() ");
+ }
+ this.kafkaConfiguration =
ApplicationProperties.getSubsetAsProperties(atlasConfiguration,
ATLAS_KAFKA_PROPERTY_PREFIX);
+
+ setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration);
+ adminClient = AdminClient.create(this.kafkaConfiguration);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== KafkaUtils() ");
+ }
+ }
+
+ public void createTopics(List<String> topicNames, int numPartitions, int
replicationFactor)
+ throws TopicExistsException, ExecutionException,
InterruptedException {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> createTopics() ");
+ }
+
+ List<NewTopic> newTopicList = topicNames.stream()
+ .map(topicName -> new NewTopic(topicName, numPartitions,
(short) replicationFactor))
+ .collect(Collectors.toList());
+
+ CreateTopicsResult createTopicsResult =
adminClient.createTopics(newTopicList);
+ Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values();
+ for(Map.Entry<String, KafkaFuture<Void>> futureEntry :
futureMap.entrySet()) {
+ String topicName = futureEntry.getKey();
+ KafkaFuture<Void> future = futureEntry.getValue();
+ future.get();
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== createTopics() ");
+ }
+ }
+
+ public List<String> listAllTopics() throws ExecutionException,
InterruptedException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> KafkaUtils.listAllTopics() ");
+ }
+ ListTopicsResult listTopicsResult = adminClient.listTopics();
+ List<String> topicNameList = new
ArrayList<>(listTopicsResult.names().get());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== KafkaUtils.listAllTopics() ");
+ }
+
+ return topicNameList;
+ }
+
+ public Integer getPartitionCount(String topicName) throws
ExecutionException, InterruptedException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> KafkaUtils.getPartitionCount({})", topicName);
+ }
+
+ Integer partitionCount = null;
+ DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(Collections.singleton(topicName));
+ Map<String, KafkaFuture<TopicDescription>> futureMap =
describeTopicsResult.values();
+ for(Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry :
futureMap.entrySet()) {
+ KafkaFuture<TopicDescription> topicDescriptionFuture =
futureEntry.getValue();
+ TopicDescription topicDescription = topicDescriptionFuture.get();
+ List<TopicPartitionInfo> partitionList =
topicDescription.partitions();
+ partitionCount = partitionList.size();
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {}
with count {}", topicName, partitionCount);
+ }
+
+ return partitionCount;
+ }
+
+ public void close() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> KafkaUtils.close()");
+ }
+ if(adminClient != null) {
+ adminClient.close();
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== KafkaUtils.close()");
+ }
+ }
+
+ public static void setKafkaJAASProperties(Configuration configuration,
Properties kafkaProperties) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> KafkaUtils.setKafkaJAASProperties()");
+ }
+
+ if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
+ LOG.debug("JAAS config is already set, returning");
+ return;
+ }
+
+ Properties jaasConfig =
ApplicationProperties.getSubsetAsProperties(configuration,
JAAS_CONFIG_PREFIX_PARAM);
+ // JAAS Configuration is present then update set those properties in
sasl.jaas.config
+ if(jaasConfig != null && !jaasConfig.isEmpty()) {
+ String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
+
+ // Required for backward compatability for Hive CLI
+ if (!isLoginKeytabBased() && isLoginTicketBased()) {
+ LOG.debug("Checking if ticketBased-KafkaClient is set");
+ // if ticketBased-KafkaClient property is not specified then
use the default client name
+ String ticketBasedConfigPrefix =
JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
+ Configuration ticketBasedConfig =
configuration.subset(ticketBasedConfigPrefix);
+
+ if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+ LOG.debug("ticketBased-KafkaClient JAAS configuration is
set, using it");
+
+ jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+ } else {
+ LOG.info("UserGroupInformation.isLoginTicketBased is true,
but no JAAS configuration found for client {}. Will use JAAS configuration of
client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
+ }
+ }
+
+ String keyPrefix = jaasClientName + ".";
+ String keyParam = keyPrefix +
JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+ String loginModuleName = jaasConfig.getProperty(keyParam);
+
+ if (loginModuleName == null) {
+ LOG.error("Unable to add JAAS configuration for client [{}] as
it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName,
keyParam, jaasClientName);
+ return;
+ }
+
+ keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
+ String controlFlag = jaasConfig.getProperty(keyParam);
+
+ if(StringUtils.isEmpty(controlFlag)) {
+ String validValues =
JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
+ controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
+ LOG.warn("Unknown JAAS configuration value for ({}) = [{}],
valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag,
validValues);
+ }
+ String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX
+ ".";
+ String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
+ int optionPrefixLen = optionPrefix.length();
+ StringBuffer optionStringBuffer = new StringBuffer();
+ for (String key : jaasConfig.stringPropertyNames()) {
+ if (key.startsWith(optionPrefix)) {
+ String optionVal = jaasConfig.getProperty(key);
+ if (optionVal != null) {
+ optionVal = optionVal.trim();
+
+ try {
+ if (key.equalsIgnoreCase(principalOptionKey)) {
+ optionVal =
org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String)
null);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to build serverPrincipal. Using
provided value:[{}]", optionVal);
+ }
+
+ optionVal = surroundWithQuotes(optionVal);
+ optionStringBuffer.append(String.format(" %s=%s",
key.substring(optionPrefixLen), optionVal));
+ }
+ }
+ }
+
+ String newJaasProperty = String.format("%s %s %s ;",
loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
+ kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY,
newJaasProperty);
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== KafkaUtils.setKafkaJAASProperties()");
+ }
+ }
+
+ static boolean isLoginKeytabBased() {
+ boolean ret = false;
+
+ try {
+ ret = UserGroupInformation.isLoginKeytabBased();
+ } catch (Exception excp) {
+ LOG.warn("Error in determining keytab for KafkaClient-JAAS
config", excp);
+ }
+
+ return ret;
+ }
+
+ public static boolean isLoginTicketBased() {
+ boolean ret = false;
+
+ try {
+ ret = UserGroupInformation.isLoginTicketBased();
+ } catch (Exception excp) {
+ LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS
config", excp);
+ }
+
+ return ret;
+ }
+
+ static String surroundWithQuotes(String optionVal) {
+ if(StringUtils.isEmpty(optionVal)) {
+ return optionVal;
+ }
+ String ret = optionVal;
+
+ // For property values which have special chars like "@" or "/", we
need to enclose it in
+ // double quotes, so that Kafka can parse it
+ // If the property is already enclosed in double quotes, then do
nothing.
+ if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length()
- 1) != '"') {
+ // If the string as special characters like except _,-
+ final String SPECIAL_CHAR_LIST = "/!@#%^&*";
+ if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
+ ret = String.format("\"%s\"", optionVal);
+ }
+ }
+
+ return ret;
+ }
+
+}
diff --git a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
new file mode 100644
index 0000000..14739cd
--- /dev/null
+++ b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class KafkaUtilsTest {
+
+ @Test
+ public void testSetKafkaJAASPropertiesForAllProperValues() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ KafkaUtils.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue =
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab),
"useKeyTab not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForMissingControlFlag() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ KafkaUtils.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue =
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab),
"useKeyTab not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForMissingLoginModuleName() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ KafkaUtils.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue =
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertNull(newPropertyValue);
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesWithSpecialCharacters() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionKeyTabPath = "/path/to/file.keytab";
+ final String optionPrincipal = "test/[email protected]";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.keyTabPath",
optionKeyTabPath);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.principal",
optionPrincipal);
+
+ try {
+ KafkaUtils.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue =
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+ String updatedPrincipalValue =
org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionPrincipal,
(String) null);
+
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
+ assertTrue(newPropertyValue.contains("keyTabPath=\"" +
optionKeyTabPath + "\""));
+ assertTrue(newPropertyValue.contains("principal=\""+
updatedPrincipalValue + "\""));
+
+ } catch (IOException e) {
+ fail("Failed while getting updated principal value with exception
: " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+ configuration.setProperty("atlas.kafka.bootstrap.servers",
"localhost:9100");
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab",
optionUseKeyTab);
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
+
+ try (MockedStatic mockedKafkaUtilsClass =
Mockito.mockStatic(KafkaUtils.class)) {
+
mockedKafkaUtilsClass.when(KafkaUtils::isLoginKeytabBased).thenReturn(false);
+
mockedKafkaUtilsClass.when(KafkaUtils::isLoginTicketBased).thenReturn(true);
+ mockedKafkaUtilsClass.when(() ->
KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+ mockedKafkaUtilsClass.when(() ->
KafkaUtils.setKafkaJAASProperties(configuration,
properties)).thenCallRealMethod();
+
+ KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+ String newPropertyValue =
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),
"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("storeKey=" +
optionStoreKey), "storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
+ }
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
+
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try (MockedStatic mockedKafkaUtilsClass =
Mockito.mockStatic(KafkaUtils.class)) {
+
mockedKafkaUtilsClass.when(KafkaUtils::isLoginKeytabBased).thenReturn(false);
+
mockedKafkaUtilsClass.when(KafkaUtils::isLoginTicketBased).thenReturn(true);
+ mockedKafkaUtilsClass.when(() ->
KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+ mockedKafkaUtilsClass.when(() ->
KafkaUtils.setKafkaJAASProperties(configuration,
properties)).thenCallRealMethod();
+
+ KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+ String newPropertyValue =
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+ assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),
"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
+ assertTrue(newPropertyValue.contains("storeKey=" +
optionStoreKey), "storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
+ }
+ }
+
+
+}
+
diff --git
a/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
index c695741..80a12ac 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
@@ -19,24 +19,19 @@
package org.apache.atlas.hook;
import com.google.common.annotations.VisibleForTesting;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.atlas.utils.KafkaUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
import java.io.IOException;
-import java.util.Properties;
+import java.util.Arrays;
/**
* A class to create Kafka topics used by Atlas components.
@@ -66,20 +61,13 @@ public class AtlasTopicCreator {
if (!handleSecurity(atlasProperties)) {
return;
}
- ZkUtils zkUtils = createZkUtils(atlasProperties);
- for (String topicName : topicNames) {
- try {
- LOG.warn("Attempting to create topic {}", topicName);
- if (!ifTopicExists(topicName, zkUtils)) {
- createTopic(atlasProperties, topicName, zkUtils);
- } else {
- LOG.warn("Ignoring call to create topic {}, as it
already exists.", topicName);
- }
- } catch (Throwable t) {
- LOG.error("Failed while creating topic {}", topicName, t);
- }
+ try(KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {
+ int numPartitions =
atlasProperties.getInt("atlas.notification.partitions", 1);
+ int numReplicas =
atlasProperties.getInt("atlas.notification.replicas", 1);
+ kafkaUtils.createTopics(Arrays.asList(topicNames),
numPartitions, numReplicas);
+ } catch (Exception e) {
+ LOG.error("Error while creating topics e :" + e.getMessage(),
e);
}
- zkUtils.close();
} else {
LOG.info("Not creating topics {} as {} is false",
StringUtils.join(topicNames, ","),
ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
@@ -105,28 +93,9 @@ public class AtlasTopicCreator {
return true;
}
- @VisibleForTesting
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
- return AdminUtils.topicExists(zkUtils, topicName);
- }
-
- @VisibleForTesting
- protected void createTopic(Configuration atlasProperties, String
topicName, ZkUtils zkUtils) {
- int numPartitions =
atlasProperties.getInt("atlas.notification.hook.numthreads", 1);
- int numReplicas =
atlasProperties.getInt("atlas.notification.replicas", 1);
- AdminUtils.createTopic(zkUtils, topicName, numPartitions, numReplicas,
- new Properties(), RackAwareMode.Enforced$.MODULE$);
- LOG.warn("Created topic {} with partitions {} and replicas {}",
topicName, numPartitions, numReplicas);
- }
-
- @VisibleForTesting
- protected ZkUtils createZkUtils(Configuration atlasProperties) {
- String zkConnect =
atlasProperties.getString("atlas.kafka.zookeeper.connect");
- int sessionTimeout =
atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400);
- int connectionTimeout =
atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200);
- Tuple2<ZkClient, ZkConnection> zkClientAndConnection =
ZkUtils.createZkClientAndConnection(
- zkConnect, sessionTimeout, connectionTimeout);
- return new ZkUtils(zkClientAndConnection._1(),
zkClientAndConnection._2(), false);
+ // This method is added to mock the creation of kafkaUtils object while
writing the test cases
+ KafkaUtils getKafkaUtils(Configuration configuration) {
+ return new KafkaUtils(configuration);
}
public static void main(String[] args) throws AtlasException {
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 05fd977..3d1b3cc 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -25,10 +25,10 @@ import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.service.Service;
+import org.apache.atlas.utils.KafkaUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -42,7 +42,6 @@ import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import java.io.IOException;
import java.util.*;
import java.util.concurrent.Future;
@@ -63,17 +62,6 @@ public class KafkaNotification extends AbstractNotification
implements Service {
public static final String ATLAS_ENTITIES_TOPIC =
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
- static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
- private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
- private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM =
"loginModuleName";
- private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM =
"loginModuleControlFlag";
- private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG =
"required";
- private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS =
"optional|requisite|sufficient|required";
- private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
- private static final String JAAS_PRINCIPAL_PROP = "principal";
- private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
- private static final String JAAS_TICKET_BASED_CLIENT_NAME =
"ticketBased-KafkaClient";
-
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS =
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS =
AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
@@ -144,7 +132,7 @@ public class KafkaNotification extends AbstractNotification
implements Service {
// if no value is specified for max.poll.records, set to 1
properties.put("max.poll.records",
kafkaConf.getInt("max.poll.records", 1));
- setKafkaJAASProperties(applicationProperties, properties);
+ KafkaUtils.setKafkaJAASProperties(applicationProperties, properties);
LOG.info("<== KafkaNotification()");
}
@@ -414,127 +402,4 @@ public class KafkaNotification extends
AbstractNotification implements Service {
return ret;
}
- void setKafkaJAASProperties(Configuration configuration, Properties
kafkaProperties) {
- LOG.debug("==> KafkaNotification.setKafkaJAASProperties()");
-
- if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
- LOG.debug("JAAS config is already set, returning");
- return;
- }
-
- Properties jaasConfig =
ApplicationProperties.getSubsetAsProperties(configuration,
JAAS_CONFIG_PREFIX_PARAM);
- // JAAS Configuration is present then update set those properties in
sasl.jaas.config
- if(jaasConfig != null && !jaasConfig.isEmpty()) {
- String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
-
- // Required for backward compatability for Hive CLI
- if (!isLoginKeytabBased() && isLoginTicketBased()) {
- LOG.debug("Checking if ticketBased-KafkaClient is set");
- // if ticketBased-KafkaClient property is not specified then
use the default client name
- String ticketBasedConfigPrefix =
JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
- Configuration ticketBasedConfig =
configuration.subset(ticketBasedConfigPrefix);
-
- if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
- LOG.debug("ticketBased-KafkaClient JAAS configuration is
set, using it");
-
- jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
- } else {
- LOG.info("UserGroupInformation.isLoginTicketBased is true,
but no JAAS configuration found for client {}. Will use JAAS configuration of
client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
- }
- }
-
- String keyPrefix = jaasClientName + ".";
- String keyParam = keyPrefix +
JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
- String loginModuleName = jaasConfig.getProperty(keyParam);
-
- if (loginModuleName == null) {
- LOG.error("Unable to add JAAS configuration for client [{}] as
it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName,
keyParam, jaasClientName);
- return;
- }
-
- keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
- String controlFlag = jaasConfig.getProperty(keyParam);
-
- if(StringUtils.isEmpty(controlFlag)) {
- String validValues =
JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
- controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
- LOG.warn("Unknown JAAS configuration value for ({}) = [{}],
valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag,
validValues);
- }
- String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX
+ ".";
- String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
- int optionPrefixLen = optionPrefix.length();
- StringBuffer optionStringBuffer = new StringBuffer();
- for (String key : jaasConfig.stringPropertyNames()) {
- if (key.startsWith(optionPrefix)) {
- String optionVal = jaasConfig.getProperty(key);
- if (optionVal != null) {
- optionVal = optionVal.trim();
-
- try {
- if (key.equalsIgnoreCase(principalOptionKey)) {
- optionVal =
org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String)
null);
- }
- } catch (IOException e) {
- LOG.warn("Failed to build serverPrincipal. Using
provided value:[{}]", optionVal);
- }
-
- optionVal = surroundWithQuotes(optionVal);
- optionStringBuffer.append(String.format(" %s=%s",
key.substring(optionPrefixLen), optionVal));
- }
- }
- }
-
- String newJaasProperty = String.format("%s %s %s ;",
loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
- kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY,
newJaasProperty);
- }
-
- LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
- }
-
- @VisibleForTesting
- boolean isLoginKeytabBased() {
- boolean ret = false;
-
- try {
- ret = UserGroupInformation.isLoginKeytabBased();
- } catch (Exception excp) {
- LOG.warn("Error in determining keytab for KafkaClient-JAAS
config", excp);
- }
-
- return ret;
- }
-
- @VisibleForTesting
- boolean isLoginTicketBased() {
- boolean ret = false;
-
- try {
- ret = UserGroupInformation.isLoginTicketBased();
- } catch (Exception excp) {
- LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS
config", excp);
- }
-
- return ret;
- }
-
- private static String surroundWithQuotes(String optionVal) {
- if(StringUtils.isEmpty(optionVal)) {
- return optionVal;
- }
- String ret = optionVal;
-
- // For property values which have special chars like "@" or "/", we
need to enclose it in
- // double quotes, so that Kafka can parse it
- // If the property is already enclosed in double quotes, then do
nothing.
- if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length()
- 1) != '"') {
- // If the string as special characters like except _,-
- final String SPECIAL_CHAR_LIST = "/!@#%^&*";
- if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
- ret = String.format("\"%s\"", optionVal);
- }
- }
-
- return ret;
- }
-
}
diff --git
a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
index 2937847..6d1a5b6 100644
---
a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
+++
b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
@@ -18,20 +18,21 @@
package org.apache.atlas.hook;
-import kafka.utils.ZkUtils;
import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.utils.KafkaUtils;
import org.apache.commons.configuration.Configuration;
+import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
public class AtlasTopicCreatorTest {
@@ -44,179 +45,35 @@ public class AtlasTopicCreatorTest {
Configuration configuration = mock(Configuration.class);
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
thenReturn(false);
-
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
- final boolean[] topicExistsCalled = new boolean[] {false};
- AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
- @Override
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils)
{
- topicExistsCalled[0] = true;
- return false;
- }
- };
- atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
- assertFalse(topicExistsCalled[0]);
- }
-
- @Test
- public void shouldNotCreateTopicIfItAlreadyExists() {
- Configuration configuration = mock(Configuration.class);
-
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
- thenReturn(true);
-
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
- final ZkUtils zookeeperUtils = mock(ZkUtils.class);
- final boolean[] topicExistsCalled = new boolean[]{false};
- final boolean[] createTopicCalled = new boolean[]{false};
-
- AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
- @Override
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils)
{
- topicExistsCalled[0] = true;
- return true;
- }
-
- @Override
- protected ZkUtils createZkUtils(Configuration atlasProperties) {
- return zookeeperUtils;
- }
-
- @Override
- protected void createTopic(Configuration atlasProperties, String
topicName, ZkUtils zkUtils) {
- createTopicCalled[0] = true;
- }
- };
- atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
- assertTrue(topicExistsCalled[0]);
- assertFalse(createTopicCalled[0]);
- }
-
- @Test
- public void shouldCreateTopicIfItDoesNotExist() {
- Configuration configuration = mock(Configuration.class);
-
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
- thenReturn(true);
-
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
- final ZkUtils zookeeperUtils = mock(ZkUtils.class);
-
- final boolean[] createdTopic = new boolean[]{false};
-
- AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
- @Override
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils)
{
- return false;
- }
-
- @Override
- protected ZkUtils createZkUtils(Configuration atlasProperties) {
- return zookeeperUtils;
- }
-
- @Override
- protected void createTopic(Configuration atlasProperties, String
topicName, ZkUtils zkUtils) {
- createdTopic[0] = true;
- }
- };
- atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
- assertTrue(createdTopic[0]);
- }
- @Test
- public void shouldNotFailIfExceptionOccursDuringCreatingTopic() {
- Configuration configuration = mock(Configuration.class);
-
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
- thenReturn(true);
-
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
- final ZkUtils zookeeperUtils = mock(ZkUtils.class);
- final boolean[] createTopicCalled = new boolean[]{false};
+ AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
+ AtlasTopicCreator spyAtlasTopicCreator =
Mockito.spy(atlasTopicCreator);
+ spyAtlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
+ Mockito.verify(spyAtlasTopicCreator,
times(0)).handleSecurity(configuration);
- AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
- @Override
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils)
{
- return false;
- }
-
- @Override
- protected ZkUtils createZkUtils(Configuration atlasProperties) {
- return zookeeperUtils;
- }
-
- @Override
- protected void createTopic(Configuration atlasProperties, String
topicName, ZkUtils zkUtils) {
- createTopicCalled[0] = true;
- throw new RuntimeException("Simulating failure during creating
topic");
- }
- };
- atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
- assertTrue(createTopicCalled[0]);
}
@Test
- public void shouldCreateMultipleTopics() {
+ public void shouldCreateTopicIfConfiguredToDoSo() {
Configuration configuration = mock(Configuration.class);
-
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
- thenReturn(true);
-
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
- final ZkUtils zookeeperUtils = mock(ZkUtils.class);
- final Map<String, Boolean> createdTopics = new HashMap<>();
- createdTopics.put(ATLAS_HOOK_TOPIC, false);
- createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
-
- AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
-
- @Override
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils)
{
- return false;
- }
-
- @Override
- protected ZkUtils createZkUtils(Configuration atlasProperties) {
- return zookeeperUtils;
- }
-
- @Override
- protected void createTopic(Configuration atlasProperties, String
topicName, ZkUtils zkUtils) {
- createdTopics.put(topicName, true);
- }
- };
- atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC,
ATLAS_ENTITIES_TOPIC);
- assertTrue(createdTopics.get(ATLAS_HOOK_TOPIC));
- assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
- }
-
- @Test
- public void shouldCreateTopicEvenIfEarlierOneFails() {
- Configuration configuration = mock(Configuration.class);
+ KafkaUtils mockKafkaUtils = Mockito.mock(KafkaUtils.class);
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
thenReturn(true);
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
- final ZkUtils zookeeperUtils = mock(ZkUtils.class);
-
- final Map<String, Boolean> createdTopics = new HashMap<>();
- createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
+ AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
- AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+ AtlasTopicCreator spyAtlasTopicCreator =
Mockito.spy(atlasTopicCreator);
+
Mockito.doReturn(mockKafkaUtils).when(spyAtlasTopicCreator).getKafkaUtils(configuration);
- @Override
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils)
{
- return false;
- }
+ spyAtlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
- @Override
- protected ZkUtils createZkUtils(Configuration atlasProperties) {
- return zookeeperUtils;
- }
+ try {
+ verify(mockKafkaUtils).createTopics(anyList(), anyInt(), anyInt());
+ } catch (ExecutionException | InterruptedException e) {
+ Assert.fail("Caught exception while verifying createTopics: " +
e.getMessage());
+ }
- @Override
- protected void createTopic(Configuration atlasProperties, String
topicName, ZkUtils zkUtils) {
- if (topicName.equals(ATLAS_HOOK_TOPIC)) {
- throw new RuntimeException("Simulating failure when
creating ATLAS_HOOK topic");
- } else {
- createdTopics.put(topicName, true);
- }
- }
- };
- atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC,
ATLAS_ENTITIES_TOPIC);
- assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
}
@Test
@@ -225,26 +82,17 @@ public class AtlasTopicCreatorTest {
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
thenReturn(true);
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
- final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+ KafkaUtils mockKafkaUtils = Mockito.mock(KafkaUtils.class);
- AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
- @Override
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils)
{
- return false;
- }
+ AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
- @Override
- protected ZkUtils createZkUtils(Configuration atlasProperties) {
- return zookeeperUtils;
- }
+ AtlasTopicCreator spyAtlasTopicCreator =
Mockito.spy(atlasTopicCreator);
+
Mockito.doReturn(mockKafkaUtils).when(spyAtlasTopicCreator).getKafkaUtils(configuration);
- @Override
- protected void createTopic(Configuration atlasProperties, String
topicName, ZkUtils zkUtils) {
- }
- };
- atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC,
ATLAS_ENTITIES_TOPIC);
+ spyAtlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
+
+ verify(mockKafkaUtils).close();
- verify(zookeeperUtils, times(1)).close();
}
@Test
@@ -252,34 +100,19 @@ public class AtlasTopicCreatorTest {
Configuration configuration = mock(Configuration.class);
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
thenReturn(true);
- final ZkUtils zookeeperUtils = mock(ZkUtils.class);
- final Map<String, Boolean> createdTopics = new HashMap<>();
- createdTopics.put(ATLAS_HOOK_TOPIC, false);
- createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
-
- AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
- @Override
- protected boolean ifTopicExists(String topicName, ZkUtils zkUtils)
{
- return false;
- }
+ KafkaUtils mockKafkaUtils = Mockito.mock(KafkaUtils.class);
+ AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
- @Override
- protected ZkUtils createZkUtils(Configuration atlasProperties) {
- return zookeeperUtils;
- }
+ AtlasTopicCreator spyAtlasTopicCreator =
Mockito.spy(atlasTopicCreator);
+
Mockito.doReturn(mockKafkaUtils).when(spyAtlasTopicCreator).getKafkaUtils(configuration);
+
Mockito.doReturn(false).when(spyAtlasTopicCreator).handleSecurity(configuration);
- @Override
- protected void createTopic(Configuration atlasProperties, String
topicName, ZkUtils zkUtils) {
- createdTopics.put(topicName, true);
- }
+ spyAtlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC,
ATLAS_ENTITIES_TOPIC);
- @Override
- protected boolean handleSecurity(Configuration atlasProperties) {
- return false;
- }
- };
- atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC,
ATLAS_ENTITIES_TOPIC);
- assertFalse(createdTopics.get(ATLAS_HOOK_TOPIC));
- assertFalse(createdTopics.get(ATLAS_ENTITIES_TOPIC));
+ try {
+ verify(mockKafkaUtils, times(0)).createTopics(anyList(), anyInt(),
anyInt());
+ } catch (ExecutionException | InterruptedException e) {
+ Assert.fail("Caught exception while verifying createTopics: " +
e.getMessage());
+ }
}
}
diff --git
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 51c5a0d..24b6aa9 100644
---
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -151,202 +151,6 @@ public class KafkaNotificationMockTest {
}
}
- @Test
- public void testSetKafkaJAASPropertiesForAllProperValues() {
- Properties properties = new Properties();
- Configuration configuration = new PropertiesConfiguration();
-
- final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
- final String loginModuleControlFlag = "required";
- final String optionUseKeyTab = "false";
- final String optionStoreKey = "true";
- final String optionServiceName = "kafka";
-
-
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
-
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
- configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
- configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
-
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
-
- try {
- KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
- kafkaNotification.setKafkaJAASProperties(configuration,
properties);
- String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
- assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
-
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
- assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
- assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
- assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
- } catch (AtlasException e) {
- fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
- }
-
- }
-
- @Test
- public void testSetKafkaJAASPropertiesForMissingControlFlag() {
- Properties properties = new Properties();
- Configuration configuration = new PropertiesConfiguration();
-
- final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
- final String loginModuleControlFlag = "required";
- final String optionUseKeyTab = "false";
- final String optionStoreKey = "true";
- final String optionServiceName = "kafka";
-
-
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
- configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
- configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
-
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
-
- try {
- KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
- kafkaNotification.setKafkaJAASProperties(configuration,
properties);
- String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
- assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
-
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
- assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
- assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
- assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
- } catch (AtlasException e) {
- fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
- }
-
- }
-
- @Test
- public void testSetKafkaJAASPropertiesForMissingLoginModuleName() {
- Properties properties = new Properties();
- Configuration configuration = new PropertiesConfiguration();
-
- final String loginModuleControlFlag = "required";
- final String optionUseKeyTab = "false";
- final String optionStoreKey = "true";
- final String optionServiceName = "kafka";
-
-
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
- configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
- configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
-
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
-
- try {
- KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
- kafkaNotification.setKafkaJAASProperties(configuration,
properties);
- String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
- assertNull(newPropertyValue);
- } catch (AtlasException e) {
- fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
- }
-
- }
-
- @Test
- public void testSetKafkaJAASPropertiesWithSpecialCharacters() {
- Properties properties = new Properties();
- Configuration configuration = new PropertiesConfiguration();
-
- final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
- final String loginModuleControlFlag = "required";
- final String optionKeyTabPath = "/path/to/file.keytab";
- final String optionPrincipal = "test/[email protected]";
-
-
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
-
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
- configuration.setProperty("atlas.jaas.KafkaClient.option.keyTabPath",
optionKeyTabPath);
- configuration.setProperty("atlas.jaas.KafkaClient.option.principal",
optionPrincipal);
-
- try {
- KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
- kafkaNotification.setKafkaJAASProperties(configuration,
properties);
- String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
- String updatedPrincipalValue =
org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionPrincipal,
(String) null);
-
- assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
-
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
- assertTrue(newPropertyValue.contains("keyTabPath=\"" +
optionKeyTabPath + "\""));
- assertTrue(newPropertyValue.contains("principal=\""+
updatedPrincipalValue + "\""));
-
- } catch (AtlasException e) {
- fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
- } catch (IOException e) {
- fail("Failed while getting updated principal value with exception
: " + e.getMessage());
- }
-
- }
-
- @Test
- public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
- Properties properties = new Properties();
- Configuration configuration = new PropertiesConfiguration();
-
- final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
- final String loginModuleControlFlag = "required";
- final String optionUseKeyTab = "false";
- final String optionStoreKey = "true";
- final String optionServiceName = "kafka";
-
-
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
-
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
-
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab",
optionUseKeyTab);
-
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey",
optionStoreKey);
-
configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
-
- try {
- KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
- KafkaNotification spyKafkaNotification =
Mockito.spy(kafkaNotification);
- when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
- when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
- spyKafkaNotification.setKafkaJAASProperties(configuration,
properties);
- String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
- assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
-
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
- assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
- assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
- assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
- } catch (AtlasException e) {
- fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
- }
- }
-
- @Test
- public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
- Properties properties = new Properties();
- Configuration configuration = new PropertiesConfiguration();
-
- final String loginModuleName =
"com.sun.security.auth.module.Krb5LoginModule";
- final String loginModuleControlFlag = "required";
- final String optionUseKeyTab = "false";
- final String optionStoreKey = "true";
- final String optionServiceName = "kafka";
-
-
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
-
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag",
loginModuleControlFlag);
- configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab",
optionUseKeyTab);
- configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey",
optionStoreKey);
-
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
-
- try {
- KafkaNotification kafkaNotification = new
KafkaNotification(configuration);
- KafkaNotification spyKafkaNotification =
Mockito.spy(kafkaNotification);
- when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
- when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
- spyKafkaNotification.setKafkaJAASProperties(configuration,
properties);
- String newPropertyValue =
properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
- assertTrue(newPropertyValue.contains(loginModuleName),
"loginModuleName not present in new property");
-
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
not present in new property");
- assertTrue(newPropertyValue.contains("useKeyTab=" +
optionUseKeyTab), "useKeyTab not present in new property or value doesn't
match");
- assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey),
"storeKey not present in new property or value doesn't match");
- assertTrue(newPropertyValue.contains("serviceName=" +
optionServiceName), "serviceName not present in new property or value doesn't
match");
- } catch (AtlasException e) {
- fail("Failed while creating KafkaNotification object with
exception : " + e.getMessage());
- }
- }
-
class TestKafkaNotification extends KafkaNotification {
private final AtlasKafkaConsumer consumer1;
diff --git
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java
index 1ae98ce..2d2775b 100644
---
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java
@@ -55,7 +55,7 @@ public class BulkImportPercentTest {
percentHolder.add(d.intValue());
return null;
}
- }).when(log).info(anyString(), anyFloat(), anyInt(), anyString());
+ }).when(log).info(anyString(), anyInt(), anyLong(), anyString());
}
@Test
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 3774064..15a1900 100644
---
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -140,7 +140,7 @@ public class NotificationHookConsumerTest {
hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1,
KafkaNotification.ATLAS_HOOK_TOPIC, -1));
- verify(consumer).commit(any(TopicPartition.class), anyInt());
+ verify(consumer).commit(any(TopicPartition.class), anyLong());
}
@Test
diff --git
a/webapp/src/test/java/org/apache/atlas/web/service/CuratorFactoryTest.java
b/webapp/src/test/java/org/apache/atlas/web/service/CuratorFactoryTest.java
index 0e48509..385f250 100644
--- a/webapp/src/test/java/org/apache/atlas/web/service/CuratorFactoryTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/CuratorFactoryTest.java
@@ -82,8 +82,7 @@ public class CuratorFactoryTest {
curatorFactory.enhanceBuilderWithSecurityParameters(zookeeperProperties,
builder);
verify(builder).aclProvider(argThat(new ArgumentMatcher<ACLProvider>()
{
@Override
- public boolean matches(Object o) {
- ACLProvider aclProvider = (ACLProvider) o;
+ public boolean matches(ACLProvider aclProvider) {
ACL acl = aclProvider.getDefaultAcl().get(0);
return acl.getId().getId().equals("[email protected]")
&& acl.getId().getScheme().equals("sasl");