Repository: atlas Updated Branches: refs/heads/branch-0.8 e288e0bc5 -> 806f983e6
ATLAS-2947: name of Kafka topics used by Atlas made configurable (cherry picked from commit ba2b1449195cb79e464e14e6fedca4a8df50fa6b) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/806f983e Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/806f983e Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/806f983e Branch: refs/heads/branch-0.8 Commit: 806f983e65c98b96ff813d3aa5c78ecde419caf1 Parents: e288e0b Author: Madhan Neethiraj <[email protected]> Authored: Wed Oct 31 16:42:00 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Nov 1 09:15:23 2018 -0700 ---------------------------------------------------------------------- .../org/apache/atlas/AtlasConfiguration.java | 3 + distro/src/bin/atlas_config.py | 2 +- .../apache/atlas/kafka/KafkaNotification.java | 5 +- .../atlas/hook/AtlasTopicCreatorTest.java | 42 ++++++------ .../apache/atlas/kafka/KafkaConsumerTest.java | 71 +++++++------------- .../notification/NotificationHookConsumer.java | 14 +++- 6 files changed, 65 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/common/src/main/java/org/apache/atlas/AtlasConfiguration.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java index ace8a0f..1b3ce1e 100644 --- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -33,6 +33,9 @@ public enum AtlasConfiguration { QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024), + NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"), + NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"), + NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)), NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true), NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60), http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/distro/src/bin/atlas_config.py ---------------------------------------------------------------------- diff --git a/distro/src/bin/atlas_config.py b/distro/src/bin/atlas_config.py index ada5743..1b91a2e 100755 --- a/distro/src/bin/atlas_config.py +++ b/distro/src/bin/atlas_config.py @@ -445,7 +445,7 @@ def get_topics_to_create(confdir): if topic_list is not None: topics = topic_list.split(",") else: - topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"] + topics = [getConfigWithDefault("atlas.notification.hook.topic.name", "ATLAS_HOOK"), getConfigWithDefault("atlas.notification.entities.topic.name", "ATLAS_ENTITIES")] return topics def get_atlas_url_port(confdir): http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 4c753d2..73bc315 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -19,6 +19,7 @@ package org.apache.atlas.kafka; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasException; import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.NotificationConsumer; @@ -52,8 +53,8 @@ public class KafkaNotification extends AbstractNotification implements Service { public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class); public static final String PROPERTY_PREFIX = "atlas.kafka"; - public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK"; - public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES"; + public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed."; http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java index 8585898..2937847 100644 --- a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java @@ -19,6 +19,7 @@ package org.apache.atlas.hook; import kafka.utils.ZkUtils; +import org.apache.atlas.AtlasConfiguration; import org.apache.commons.configuration.Configuration; import org.testng.annotations.Test; @@ -34,6 +35,9 @@ import static org.testng.Assert.assertTrue; public class AtlasTopicCreatorTest { + private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + private final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); + @Test public void shouldNotCreateAtlasTopicIfNotConfiguredToDoSo() { @@ -49,7 +53,7 @@ public class AtlasTopicCreatorTest { return false; } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC); assertFalse(topicExistsCalled[0]); } @@ -80,7 +84,7 @@ public class AtlasTopicCreatorTest { createTopicCalled[0] = true; } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC); assertTrue(topicExistsCalled[0]); assertFalse(createTopicCalled[0]); } @@ -111,7 +115,7 @@ public class AtlasTopicCreatorTest { createdTopic[0] = true; } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC); assertTrue(createdTopic[0]); } @@ -141,7 +145,7 @@ public class AtlasTopicCreatorTest { throw new RuntimeException("Simulating failure during creating topic"); } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC); assertTrue(createTopicCalled[0]); } @@ -154,8 +158,8 @@ public class AtlasTopicCreatorTest { final ZkUtils zookeeperUtils = mock(ZkUtils.class); final Map<String, Boolean> createdTopics = new HashMap<>(); - createdTopics.put("ATLAS_HOOK", false); - createdTopics.put("ATLAS_ENTITIES", false); + createdTopics.put(ATLAS_HOOK_TOPIC, false); + createdTopics.put(ATLAS_ENTITIES_TOPIC, false); AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { @@ -174,9 +178,9 @@ public class AtlasTopicCreatorTest { createdTopics.put(topicName, true); } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); - assertTrue(createdTopics.get("ATLAS_HOOK")); - assertTrue(createdTopics.get("ATLAS_ENTITIES")); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC); + assertTrue(createdTopics.get(ATLAS_HOOK_TOPIC)); + assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC)); } @Test @@ -188,7 +192,7 @@ public class AtlasTopicCreatorTest { final ZkUtils zookeeperUtils = mock(ZkUtils.class); final Map<String, Boolean> createdTopics = new HashMap<>(); - createdTopics.put("ATLAS_ENTITIES", false); + createdTopics.put(ATLAS_ENTITIES_TOPIC, false); AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { @@ -204,15 +208,15 @@ public class AtlasTopicCreatorTest { @Override protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { - if (topicName.equals("ATLAS_HOOK")) { + if (topicName.equals(ATLAS_HOOK_TOPIC)) { throw new RuntimeException("Simulating failure when creating ATLAS_HOOK topic"); } else { createdTopics.put(topicName, true); } } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); - assertTrue(createdTopics.get("ATLAS_ENTITIES")); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC); + assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC)); } @Test @@ -238,7 +242,7 @@ public class AtlasTopicCreatorTest { protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC); verify(zookeeperUtils, times(1)).close(); } @@ -250,8 +254,8 @@ public class AtlasTopicCreatorTest { thenReturn(true); final ZkUtils zookeeperUtils = mock(ZkUtils.class); final Map<String, Boolean> createdTopics = new HashMap<>(); - createdTopics.put("ATLAS_HOOK", false); - createdTopics.put("ATLAS_ENTITIES", false); + createdTopics.put(ATLAS_HOOK_TOPIC, false); + createdTopics.put(ATLAS_ENTITIES_TOPIC, false); AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { @Override @@ -274,8 +278,8 @@ public class AtlasTopicCreatorTest { return false; } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); - assertFalse(createdTopics.get("ATLAS_HOOK")); - assertFalse(createdTopics.get("ATLAS_ENTITIES")); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC); + assertFalse(createdTopics.get(ATLAS_HOOK_TOPIC)); + assertFalse(createdTopics.get(ATLAS_ENTITIES_TOPIC)); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index 08a20bd..782fbfe 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -21,11 +21,14 @@ package org.apache.atlas.kafka; import kafka.message.MessageAndMetadata; import org.apache.atlas.notification.*; import org.apache.atlas.notification.AtlasNotificationMessage; +import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.notification.entity.EntityNotificationImplTest; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.AtlasConfiguration; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -40,9 +43,6 @@ import org.testng.annotations.Test; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; import static org.mockito.Mockito.mock; @@ -58,11 +58,12 @@ public class KafkaConsumerTest { private static final String TRAIT_NAME = "MyTrait"; + private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + @Mock private KafkaConsumer kafkaConsumer; - @BeforeMethod public void setup() { MockitoAnnotations.initMocks(this); @@ -70,28 +71,16 @@ public class KafkaConsumerTest { @Test public void testReceive() throws Exception { - + Referenceable entity = getEntity(TRAIT_NAME); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); + TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC, 0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json)); + Map mp = Collections.singletonMap(tp, klist); + ConsumerRecords records = new ConsumerRecords(mp); MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); - Referenceable entity = getEntity(TRAIT_NAME); - - HookNotification.EntityUpdateRequest message = - new HookNotification.EntityUpdateRequest("user1", entity); - - String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); - - kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); - List<ConsumerRecord> klist = new ArrayList<>(); - klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK", - 0, 0L, "mykey", json)); - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - Map mp = new HashMap(); - mp.put(tp,klist); - ConsumerRecords records = new ConsumerRecords(mp); - - when(kafkaConsumer.poll(100)).thenReturn(records); when(messageAndMetadata.message()).thenReturn(json); @@ -108,26 +97,16 @@ public class KafkaConsumerTest { @Test public void testNextVersionMismatch() throws Exception { + Referenceable entity = getEntity(TRAIT_NAME); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message)); + TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json)); + Map mp = Collections.singletonMap(tp,klist); + ConsumerRecords records = new ConsumerRecords(mp); MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); - Referenceable entity = getEntity(TRAIT_NAME); - - HookNotification.EntityUpdateRequest message = - new HookNotification.EntityUpdateRequest("user1", entity); - - String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message)); - - kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); - List<ConsumerRecord> klist = new ArrayList<>(); - klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK", - 0, 0L, "mykey", json)); - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - Map mp = new HashMap(); - mp.put(tp,klist); - ConsumerRecords records = new ConsumerRecords(mp); - when(kafkaConsumer.poll(100L)).thenReturn(records); when(messageAndMetadata.message()).thenReturn(json); @@ -148,10 +127,8 @@ public class KafkaConsumerTest { @Test public void testCommitIsCalledIfAutoCommitDisabled() { - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L); + TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L); consumer.commit(tp, 1); @@ -160,10 +137,8 @@ public class KafkaConsumerTest { @Test public void testCommitIsNotCalledIfAutoCommitEnabled() { - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L); + TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L); consumer.commit(tp, 1); http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 97fd095..c847f3d 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -20,7 +20,15 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import kafka.utils.ShutdownableThread; -import org.apache.atlas.*; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasBaseClient; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.listener.ActiveStateChangeHandler; @@ -88,6 +96,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final String ATTRIBUTE_INPUTS = "inputs"; private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); + private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; @@ -101,6 +110,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold"; + private final AtlasEntityStore atlasEntityStore; private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; @@ -570,7 +580,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { recordFailedMessages(); - TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); + TopicPartition partition = new TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition()); consumer.commit(partition, kafkaMessage.getOffset() + 1); commitSucceessStatus = true;
