Repository: atlas Updated Branches: refs/heads/master fb7e9eaf9 -> ba2b14491
ATLAS-2947: name of Kafka topics used by Atlas made configurable Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/ba2b1449 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/ba2b1449 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/ba2b1449 Branch: refs/heads/master Commit: ba2b1449195cb79e464e14e6fedca4a8df50fa6b Parents: fb7e9ea Author: Madhan Neethiraj <[email protected]> Authored: Wed Oct 31 16:42:00 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Nov 1 08:08:02 2018 -0700 ---------------------------------------------------------------------- distro/src/bin/atlas_config.py | 2 +- .../org/apache/atlas/AtlasConfiguration.java | 3 ++ .../apache/atlas/kafka/KafkaNotification.java | 5 ++- .../atlas/hook/AtlasTopicCreatorTest.java | 42 +++++++++++--------- .../apache/atlas/kafka/KafkaConsumerTest.java | 16 ++++---- .../notification/NotificationHookConsumer.java | 5 ++- 6 files changed, 43 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/ba2b1449/distro/src/bin/atlas_config.py ---------------------------------------------------------------------- diff --git a/distro/src/bin/atlas_config.py b/distro/src/bin/atlas_config.py index f6a6bef..747b03b 100755 --- a/distro/src/bin/atlas_config.py +++ b/distro/src/bin/atlas_config.py @@ -468,7 +468,7 @@ def get_topics_to_create(confdir): if topic_list is not None: topics = topic_list.split(",") else: - topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"] + topics = [getConfigWithDefault("atlas.notification.hook.topic.name", "ATLAS_HOOK"), getConfigWithDefault("atlas.notification.entities.topic.name", "ATLAS_ENTITIES")] return topics def get_atlas_url_port(confdir): http://git-wip-us.apache.org/repos/asf/atlas/blob/ba2b1449/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index ace8a0f..1b3ce1e 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -33,6 +33,9 @@ public enum AtlasConfiguration { QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024), + NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"), + NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"), + NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)), NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true), NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60), http://git-wip-us.apache.org/repos/asf/atlas/blob/ba2b1449/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 624dc55..4bec917 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -19,6 +19,7 @@ package org.apache.atlas.kafka; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasException; import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.NotificationConsumer; @@ -52,8 +53,8 @@ public class KafkaNotification extends AbstractNotification implements Service { public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class); public static final String PROPERTY_PREFIX = "atlas.kafka"; - public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK"; - public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES"; + public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed."; http://git-wip-us.apache.org/repos/asf/atlas/blob/ba2b1449/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java index 8585898..2937847 100644 --- a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java @@ -19,6 +19,7 @@ package org.apache.atlas.hook; import kafka.utils.ZkUtils; +import org.apache.atlas.AtlasConfiguration; import org.apache.commons.configuration.Configuration; import org.testng.annotations.Test; @@ -34,6 +35,9 @@ import static org.testng.Assert.assertTrue; public class AtlasTopicCreatorTest { + private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + private final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); + @Test public void shouldNotCreateAtlasTopicIfNotConfiguredToDoSo() { @@ -49,7 +53,7 @@ public class AtlasTopicCreatorTest { return false; } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC); assertFalse(topicExistsCalled[0]); } @@ -80,7 +84,7 @@ public class AtlasTopicCreatorTest { createTopicCalled[0] = true; } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC); assertTrue(topicExistsCalled[0]); assertFalse(createTopicCalled[0]); } @@ -111,7 +115,7 @@ public class AtlasTopicCreatorTest { createdTopic[0] = true; } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC); assertTrue(createdTopic[0]); } @@ -141,7 +145,7 @@ public class AtlasTopicCreatorTest { throw new RuntimeException("Simulating failure during creating topic"); } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC); assertTrue(createTopicCalled[0]); } @@ -154,8 +158,8 @@ public class AtlasTopicCreatorTest { final ZkUtils zookeeperUtils = mock(ZkUtils.class); final Map<String, Boolean> createdTopics = new HashMap<>(); - createdTopics.put("ATLAS_HOOK", false); - createdTopics.put("ATLAS_ENTITIES", false); + createdTopics.put(ATLAS_HOOK_TOPIC, false); + createdTopics.put(ATLAS_ENTITIES_TOPIC, false); AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { @@ -174,9 +178,9 @@ public class AtlasTopicCreatorTest { createdTopics.put(topicName, true); } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); - assertTrue(createdTopics.get("ATLAS_HOOK")); - assertTrue(createdTopics.get("ATLAS_ENTITIES")); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC); + assertTrue(createdTopics.get(ATLAS_HOOK_TOPIC)); + assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC)); } @Test @@ -188,7 +192,7 @@ public class AtlasTopicCreatorTest { final ZkUtils zookeeperUtils = mock(ZkUtils.class); final Map<String, Boolean> createdTopics = new HashMap<>(); - createdTopics.put("ATLAS_ENTITIES", false); + createdTopics.put(ATLAS_ENTITIES_TOPIC, false); AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { @@ -204,15 +208,15 @@ public class AtlasTopicCreatorTest { @Override protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { - if (topicName.equals("ATLAS_HOOK")) { + if (topicName.equals(ATLAS_HOOK_TOPIC)) { throw new RuntimeException("Simulating failure when creating ATLAS_HOOK topic"); } else { createdTopics.put(topicName, true); } } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); - assertTrue(createdTopics.get("ATLAS_ENTITIES")); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC); + assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC)); } @Test @@ -238,7 +242,7 @@ public class AtlasTopicCreatorTest { protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC); verify(zookeeperUtils, times(1)).close(); } @@ -250,8 +254,8 @@ public class AtlasTopicCreatorTest { thenReturn(true); final ZkUtils zookeeperUtils = mock(ZkUtils.class); final Map<String, Boolean> createdTopics = new HashMap<>(); - createdTopics.put("ATLAS_HOOK", false); - createdTopics.put("ATLAS_ENTITIES", false); + createdTopics.put(ATLAS_HOOK_TOPIC, false); + createdTopics.put(ATLAS_ENTITIES_TOPIC, false); AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { @Override @@ -274,8 +278,8 @@ public class AtlasTopicCreatorTest { return false; } }; - atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); - assertFalse(createdTopics.get("ATLAS_HOOK")); - assertFalse(createdTopics.get("ATLAS_ENTITIES")); + atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC); + assertFalse(createdTopics.get(ATLAS_HOOK_TOPIC)); + assertFalse(createdTopics.get(ATLAS_ENTITIES_TOPIC)); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/ba2b1449/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index 2e8abd7..847caa3 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -18,6 +18,7 @@ package org.apache.atlas.kafka; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; @@ -53,11 +54,12 @@ import static org.testng.Assert.*; public class KafkaConsumerTest { private static final String TRAIT_NAME = "MyTrait"; + private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + @Mock private KafkaConsumer kafkaConsumer; - @BeforeMethod public void setup() { MockitoAnnotations.initMocks(this); @@ -68,8 +70,8 @@ public class KafkaConsumerTest { Referenceable entity = getEntity(TRAIT_NAME); EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); - TopicPartition tp = new TopicPartition("ATLAS_HOOK", 0); - List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json)); + TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC, 0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json)); Map mp = Collections.singletonMap(tp, klist); ConsumerRecords records = new ConsumerRecords(mp); @@ -92,8 +94,8 @@ public class KafkaConsumerTest { Referenceable entity = getEntity(TRAIT_NAME); EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message)); - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json)); + TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json)); Map mp = Collections.singletonMap(tp,klist); ConsumerRecords records = new ConsumerRecords(mp); @@ -119,7 +121,7 @@ public class KafkaConsumerTest { @Test public void testCommitIsCalledIfAutoCommitDisabled() { - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0); AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L); consumer.commit(tp, 1); @@ -129,7 +131,7 @@ public class KafkaConsumerTest { @Test public void testCommitIsNotCalledIfAutoCommitEnabled() { - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0); AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, true , 100L); consumer.commit(tp, 1); http://git-wip-us.apache.org/repos/asf/atlas/blob/ba2b1449/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index a8f8dd6..da1be9e 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -23,6 +23,7 @@ import kafka.utils.ShutdownableThread; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.RequestContext; @@ -92,6 +93,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final String ATTRIBUTE_INPUTS = "inputs"; private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); + private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; @@ -105,6 +107,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final int SERVER_READY_WAIT_TIME_MS = 1000; + private final AtlasEntityStore atlasEntityStore; private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; @@ -601,7 +604,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { recordFailedMessages(); - TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); + TopicPartition partition = new TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition()); consumer.commit(partition, kafkaMessage.getOffset() + 1); commitSucceessStatus = true;
