Repository: atlas
Updated Branches:
  refs/heads/branch-1.0 23e856c17 -> 6a25a9b59


ATLAS-2947: name of Kafka topics used by Atlas made configurable

(cherry picked from commit ba2b1449195cb79e464e14e6fedca4a8df50fa6b)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6a25a9b5
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6a25a9b5
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6a25a9b5

Branch: refs/heads/branch-1.0
Commit: 6a25a9b59497e698c2d41b9eb39cd908524011c4
Parents: 23e856c
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Wed Oct 31 16:42:00 2018 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Thu Nov 1 08:10:29 2018 -0700

----------------------------------------------------------------------
 distro/src/bin/atlas_config.py                  |  2 +-
 .../org/apache/atlas/AtlasConfiguration.java    |  3 ++
 .../apache/atlas/kafka/KafkaNotification.java   |  5 ++-
 .../atlas/hook/AtlasTopicCreatorTest.java       | 42 +++++++++++---------
 .../apache/atlas/kafka/KafkaConsumerTest.java   | 16 ++++----
 .../notification/NotificationHookConsumer.java  |  5 ++-
 6 files changed, 43 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/6a25a9b5/distro/src/bin/atlas_config.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_config.py b/distro/src/bin/atlas_config.py
index f6a6bef..747b03b 100755
--- a/distro/src/bin/atlas_config.py
+++ b/distro/src/bin/atlas_config.py
@@ -468,7 +468,7 @@ def get_topics_to_create(confdir):
     if topic_list is not None:
         topics = topic_list.split(",")
     else:
-        topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"]
+        topics = [getConfigWithDefault("atlas.notification.hook.topic.name", 
"ATLAS_HOOK"), getConfigWithDefault("atlas.notification.entities.topic.name", 
"ATLAS_ENTITIES")]
     return topics
 
 def get_atlas_url_port(confdir):

http://git-wip-us.apache.org/repos/asf/atlas/blob/6a25a9b5/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index ace8a0f..1b3ce1e 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -33,6 +33,9 @@ public enum AtlasConfiguration {
 
     QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
 
+    NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", 
"ATLAS_HOOK"),
+    NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", 
"ATLAS_ENTITIES"),
+
     
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes",
 (1000 * 1000)),
     
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled",
 true),
     
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
 15 * 60),

http://git-wip-us.apache.org/repos/asf/atlas/blob/6a25a9b5/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 00e56e3..3c9ebdc 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -19,6 +19,7 @@ package org.apache.atlas.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.NotificationConsumer;
@@ -52,8 +53,8 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     public static final Logger LOG = 
LoggerFactory.getLogger(KafkaNotification.class);
 
     public    static final String PROPERTY_PREFIX            = "atlas.kafka";
-    public    static final String ATLAS_HOOK_TOPIC           = "ATLAS_HOOK";
-    public    static final String ATLAS_ENTITIES_TOPIC       = 
"ATLAS_ENTITIES";
+    public    static final String ATLAS_HOOK_TOPIC           = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+    public    static final String ATLAS_ENTITIES_TOPIC       = 
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
     protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
 
     private static final Map<NotificationType, String> TOPIC_MAP = new 
HashMap<NotificationType, String>() {

http://git-wip-us.apache.org/repos/asf/atlas/blob/6a25a9b5/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java 
b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
index 8585898..2937847 100644
--- 
a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.hook;
 
 import kafka.utils.ZkUtils;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.testng.annotations.Test;
 
@@ -34,6 +35,9 @@ import static org.testng.Assert.assertTrue;
 
 public class AtlasTopicCreatorTest {
 
+    private final String ATLAS_HOOK_TOPIC     = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+    private final String ATLAS_ENTITIES_TOPIC = 
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
+
     @Test
     public void shouldNotCreateAtlasTopicIfNotConfiguredToDoSo() {
 
@@ -49,7 +53,7 @@ public class AtlasTopicCreatorTest {
                 return false;
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
         assertFalse(topicExistsCalled[0]);
     }
 
@@ -80,7 +84,7 @@ public class AtlasTopicCreatorTest {
                 createTopicCalled[0] = true;
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
         assertTrue(topicExistsCalled[0]);
         assertFalse(createTopicCalled[0]);
     }
@@ -111,7 +115,7 @@ public class AtlasTopicCreatorTest {
                 createdTopic[0] = true;
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
         assertTrue(createdTopic[0]);
     }
 
@@ -141,7 +145,7 @@ public class AtlasTopicCreatorTest {
                 throw new RuntimeException("Simulating failure during creating 
topic");
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
         assertTrue(createTopicCalled[0]);
     }
 
@@ -154,8 +158,8 @@ public class AtlasTopicCreatorTest {
         final ZkUtils zookeeperUtils = mock(ZkUtils.class);
 
         final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put("ATLAS_HOOK", false);
-        createdTopics.put("ATLAS_ENTITIES", false);
+        createdTopics.put(ATLAS_HOOK_TOPIC, false);
+        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
 
         AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
 
@@ -174,9 +178,9 @@ public class AtlasTopicCreatorTest {
                 createdTopics.put(topicName, true);
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
-        assertTrue(createdTopics.get("ATLAS_HOOK"));
-        assertTrue(createdTopics.get("ATLAS_ENTITIES"));
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, 
ATLAS_ENTITIES_TOPIC);
+        assertTrue(createdTopics.get(ATLAS_HOOK_TOPIC));
+        assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
     }
 
     @Test
@@ -188,7 +192,7 @@ public class AtlasTopicCreatorTest {
         final ZkUtils zookeeperUtils = mock(ZkUtils.class);
 
         final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put("ATLAS_ENTITIES", false);
+        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
 
         AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
 
@@ -204,15 +208,15 @@ public class AtlasTopicCreatorTest {
 
             @Override
             protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
-                if (topicName.equals("ATLAS_HOOK")) {
+                if (topicName.equals(ATLAS_HOOK_TOPIC)) {
                     throw new RuntimeException("Simulating failure when 
creating ATLAS_HOOK topic");
                 } else {
                     createdTopics.put(topicName, true);
                 }
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
-        assertTrue(createdTopics.get("ATLAS_ENTITIES"));
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, 
ATLAS_ENTITIES_TOPIC);
+        assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
     }
 
     @Test
@@ -238,7 +242,7 @@ public class AtlasTopicCreatorTest {
             protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, 
ATLAS_ENTITIES_TOPIC);
 
         verify(zookeeperUtils, times(1)).close();
     }
@@ -250,8 +254,8 @@ public class AtlasTopicCreatorTest {
                 thenReturn(true);
         final ZkUtils zookeeperUtils = mock(ZkUtils.class);
         final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put("ATLAS_HOOK", false);
-        createdTopics.put("ATLAS_ENTITIES", false);
+        createdTopics.put(ATLAS_HOOK_TOPIC, false);
+        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
 
         AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
             @Override
@@ -274,8 +278,8 @@ public class AtlasTopicCreatorTest {
                 return false;
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
-        assertFalse(createdTopics.get("ATLAS_HOOK"));
-        assertFalse(createdTopics.get("ATLAS_ENTITIES"));
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, 
ATLAS_ENTITIES_TOPIC);
+        assertFalse(createdTopics.get(ATLAS_HOOK_TOPIC));
+        assertFalse(createdTopics.get(ATLAS_ENTITIES_TOPIC));
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6a25a9b5/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 2e8abd7..847caa3 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.kafka;
 
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
@@ -53,11 +54,12 @@ import static org.testng.Assert.*;
 public class KafkaConsumerTest {
     private static final String TRAIT_NAME = "MyTrait";
 
+    private final String ATLAS_HOOK_TOPIC = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+
 
     @Mock
     private KafkaConsumer kafkaConsumer;
 
-
     @BeforeMethod
     public void setup() {
         MockitoAnnotations.initMocks(this);
@@ -68,8 +70,8 @@ public class KafkaConsumerTest {
         Referenceable                        entity  = getEntity(TRAIT_NAME);
         EntityUpdateRequest                  message = new 
EntityUpdateRequest("user1", entity);
         String                               json    = AtlasType.toV1Json(new 
AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
-        TopicPartition                       tp      = new 
TopicPartition("ATLAS_HOOK", 0);
-        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", 
json));
+        TopicPartition                       tp      = new 
TopicPartition(ATLAS_HOOK_TOPIC, 0);
+        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, 
"mykey", json));
         Map                                  mp      = 
Collections.singletonMap(tp, klist);
         ConsumerRecords                      records = new ConsumerRecords(mp);
 
@@ -92,8 +94,8 @@ public class KafkaConsumerTest {
         Referenceable                        entity  = getEntity(TRAIT_NAME);
         EntityUpdateRequest                  message = new 
EntityUpdateRequest("user1", entity);
         String                               json    = AtlasType.toV1Json(new 
AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
-        TopicPartition                       tp      = new 
TopicPartition("ATLAS_HOOK",0);
-        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", 
json));
+        TopicPartition                       tp      = new 
TopicPartition(ATLAS_HOOK_TOPIC,0);
+        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, 
"mykey", json));
         Map                                  mp      = 
Collections.singletonMap(tp,klist);
         ConsumerRecords                      records = new ConsumerRecords(mp);
 
@@ -119,7 +121,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCommitIsCalledIfAutoCommitDisabled() {
-        TopicPartition     tp       = new TopicPartition("ATLAS_HOOK",0);
+        TopicPartition     tp       = new TopicPartition(ATLAS_HOOK_TOPIC,0);
         AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
 
         consumer.commit(tp, 1);
@@ -129,7 +131,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCommitIsNotCalledIfAutoCommitEnabled() {
-        TopicPartition     tp       = new TopicPartition("ATLAS_HOOK",0);
+        TopicPartition     tp       = new TopicPartition(ATLAS_HOOK_TOPIC,0);
         AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, true , 100L);
 
         consumer.commit(tp, 1);

http://git-wip-us.apache.org/repos/asf/atlas/blob/6a25a9b5/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index a8f8dd6..da1be9e 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -23,6 +23,7 @@ import kafka.utils.ShutdownableThread;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.RequestContext;
@@ -92,6 +93,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final String ATTRIBUTE_INPUTS         = "inputs";
 
     private static final String THREADNAME_PREFIX = 
NotificationHookConsumer.class.getSimpleName();
+    private static final String ATLAS_HOOK_TOPIC  = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
 
     public static final String CONSUMER_THREADS_PROPERTY         = 
"atlas.notification.hook.numthreads";
     public static final String CONSUMER_RETRIES_PROPERTY         = 
"atlas.notification.hook.maxretries";
@@ -105,6 +107,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
 
+
     private final AtlasEntityStore       atlasEntityStore;
     private final ServiceState           serviceState;
     private final AtlasInstanceConverter instanceConverter;
@@ -601,7 +604,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             try {
                 recordFailedMessages();
 
-                TopicPartition partition = new TopicPartition("ATLAS_HOOK", 
kafkaMessage.getPartition());
+                TopicPartition partition = new 
TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition());
 
                 consumer.commit(partition, kafkaMessage.getOffset() + 1);
                 commitSucceessStatus = true;

Reply via email to