Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 e288e0bc5 -> 806f983e6


ATLAS-2947: name of Kafka topics used by Atlas made configurable

(cherry picked from commit ba2b1449195cb79e464e14e6fedca4a8df50fa6b)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/806f983e
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/806f983e
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/806f983e

Branch: refs/heads/branch-0.8
Commit: 806f983e65c98b96ff813d3aa5c78ecde419caf1
Parents: e288e0b
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Wed Oct 31 16:42:00 2018 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Thu Nov 1 09:15:23 2018 -0700

----------------------------------------------------------------------
 .../org/apache/atlas/AtlasConfiguration.java    |  3 +
 distro/src/bin/atlas_config.py                  |  2 +-
 .../apache/atlas/kafka/KafkaNotification.java   |  5 +-
 .../atlas/hook/AtlasTopicCreatorTest.java       | 42 ++++++------
 .../apache/atlas/kafka/KafkaConsumerTest.java   | 71 +++++++-------------
 .../notification/NotificationHookConsumer.java  | 14 +++-
 6 files changed, 65 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
index ace8a0f..1b3ce1e 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -33,6 +33,9 @@ public enum AtlasConfiguration {
 
     QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
 
+    NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", 
"ATLAS_HOOK"),
+    NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", 
"ATLAS_ENTITIES"),
+
     
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes",
 (1000 * 1000)),
     
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled",
 true),
     
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
 15 * 60),

http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/distro/src/bin/atlas_config.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_config.py b/distro/src/bin/atlas_config.py
index ada5743..1b91a2e 100755
--- a/distro/src/bin/atlas_config.py
+++ b/distro/src/bin/atlas_config.py
@@ -445,7 +445,7 @@ def get_topics_to_create(confdir):
     if topic_list is not None:
         topics = topic_list.split(",")
     else:
-        topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"]
+        topics = [getConfigWithDefault("atlas.notification.hook.topic.name", 
"ATLAS_HOOK"), getConfigWithDefault("atlas.notification.entities.topic.name", 
"ATLAS_ENTITIES")]
     return topics
 
 def get_atlas_url_port(confdir):

http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 4c753d2..73bc315 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -19,6 +19,7 @@ package org.apache.atlas.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.NotificationConsumer;
@@ -52,8 +53,8 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     public static final Logger LOG = 
LoggerFactory.getLogger(KafkaNotification.class);
 
     public    static final String PROPERTY_PREFIX            = "atlas.kafka";
-    public    static final String ATLAS_HOOK_TOPIC           = "ATLAS_HOOK";
-    public    static final String ATLAS_ENTITIES_TOPIC       = 
"ATLAS_ENTITIES";
+    public    static final String ATLAS_HOOK_TOPIC           = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+    public    static final String ATLAS_ENTITIES_TOPIC       = 
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
     protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
 
     private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This 
consumer has already been closed.";

http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java 
b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
index 8585898..2937847 100644
--- 
a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.hook;
 
 import kafka.utils.ZkUtils;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.testng.annotations.Test;
 
@@ -34,6 +35,9 @@ import static org.testng.Assert.assertTrue;
 
 public class AtlasTopicCreatorTest {
 
+    private final String ATLAS_HOOK_TOPIC     = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+    private final String ATLAS_ENTITIES_TOPIC = 
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
+
     @Test
     public void shouldNotCreateAtlasTopicIfNotConfiguredToDoSo() {
 
@@ -49,7 +53,7 @@ public class AtlasTopicCreatorTest {
                 return false;
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
         assertFalse(topicExistsCalled[0]);
     }
 
@@ -80,7 +84,7 @@ public class AtlasTopicCreatorTest {
                 createTopicCalled[0] = true;
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
         assertTrue(topicExistsCalled[0]);
         assertFalse(createTopicCalled[0]);
     }
@@ -111,7 +115,7 @@ public class AtlasTopicCreatorTest {
                 createdTopic[0] = true;
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
         assertTrue(createdTopic[0]);
     }
 
@@ -141,7 +145,7 @@ public class AtlasTopicCreatorTest {
                 throw new RuntimeException("Simulating failure during creating 
topic");
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
         assertTrue(createTopicCalled[0]);
     }
 
@@ -154,8 +158,8 @@ public class AtlasTopicCreatorTest {
         final ZkUtils zookeeperUtils = mock(ZkUtils.class);
 
         final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put("ATLAS_HOOK", false);
-        createdTopics.put("ATLAS_ENTITIES", false);
+        createdTopics.put(ATLAS_HOOK_TOPIC, false);
+        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
 
         AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
 
@@ -174,9 +178,9 @@ public class AtlasTopicCreatorTest {
                 createdTopics.put(topicName, true);
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
-        assertTrue(createdTopics.get("ATLAS_HOOK"));
-        assertTrue(createdTopics.get("ATLAS_ENTITIES"));
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, 
ATLAS_ENTITIES_TOPIC);
+        assertTrue(createdTopics.get(ATLAS_HOOK_TOPIC));
+        assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
     }
 
     @Test
@@ -188,7 +192,7 @@ public class AtlasTopicCreatorTest {
         final ZkUtils zookeeperUtils = mock(ZkUtils.class);
 
         final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put("ATLAS_ENTITIES", false);
+        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
 
         AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
 
@@ -204,15 +208,15 @@ public class AtlasTopicCreatorTest {
 
             @Override
             protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
-                if (topicName.equals("ATLAS_HOOK")) {
+                if (topicName.equals(ATLAS_HOOK_TOPIC)) {
                     throw new RuntimeException("Simulating failure when 
creating ATLAS_HOOK topic");
                 } else {
                     createdTopics.put(topicName, true);
                 }
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
-        assertTrue(createdTopics.get("ATLAS_ENTITIES"));
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, 
ATLAS_ENTITIES_TOPIC);
+        assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
     }
 
     @Test
@@ -238,7 +242,7 @@ public class AtlasTopicCreatorTest {
             protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, 
ATLAS_ENTITIES_TOPIC);
 
         verify(zookeeperUtils, times(1)).close();
     }
@@ -250,8 +254,8 @@ public class AtlasTopicCreatorTest {
                 thenReturn(true);
         final ZkUtils zookeeperUtils = mock(ZkUtils.class);
         final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put("ATLAS_HOOK", false);
-        createdTopics.put("ATLAS_ENTITIES", false);
+        createdTopics.put(ATLAS_HOOK_TOPIC, false);
+        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
 
         AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
             @Override
@@ -274,8 +278,8 @@ public class AtlasTopicCreatorTest {
                 return false;
             }
         };
-        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
-        assertFalse(createdTopics.get("ATLAS_HOOK"));
-        assertFalse(createdTopics.get("ATLAS_ENTITIES"));
+        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, 
ATLAS_ENTITIES_TOPIC);
+        assertFalse(createdTopics.get(ATLAS_HOOK_TOPIC));
+        assertFalse(createdTopics.get(ATLAS_ENTITIES_TOPIC));
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 08a20bd..782fbfe 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -21,11 +21,14 @@ package org.apache.atlas.kafka;
 import kafka.message.MessageAndMetadata;
 import org.apache.atlas.notification.*;
 import org.apache.atlas.notification.AtlasNotificationMessage;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.notification.entity.EntityNotificationImplTest;
 import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -40,9 +43,6 @@ import org.testng.annotations.Test;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.mockito.Mockito.mock;
@@ -58,11 +58,12 @@ public class KafkaConsumerTest {
 
     private static final String TRAIT_NAME = "MyTrait";
 
+    private final String ATLAS_HOOK_TOPIC = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+
 
     @Mock
     private KafkaConsumer kafkaConsumer;
 
-
     @BeforeMethod
     public void setup() {
         MockitoAnnotations.initMocks(this);
@@ -70,28 +71,16 @@ public class KafkaConsumerTest {
 
     @Test
     public void testReceive() throws Exception {
-
+        Referenceable                        entity  = getEntity(TRAIT_NAME);
+        EntityUpdateRequest                  message = new 
EntityUpdateRequest("user1", entity);
+        String                               json    = 
AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new 
MessageVersion("1.0.0"), message));
+        TopicPartition                       tp      = new 
TopicPartition(ATLAS_HOOK_TOPIC, 0);
+        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, 
"mykey", json));
+        Map                                  mp      = 
Collections.singletonMap(tp, klist);
+        ConsumerRecords                      records = new ConsumerRecords(mp);
 
         MessageAndMetadata<String, String> messageAndMetadata = 
mock(MessageAndMetadata.class);
 
-        Referenceable entity = getEntity(TRAIT_NAME);
-
-        HookNotification.EntityUpdateRequest message =
-            new HookNotification.EntityUpdateRequest("user1", entity);
-
-        String json = AbstractNotification.GSON.toJson(new 
AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
-
-        kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 
0)));
-        List<ConsumerRecord> klist = new ArrayList<>();
-        klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
-                0, 0L, "mykey", json));
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-        Map mp = new HashMap();
-        mp.put(tp,klist);
-        ConsumerRecords records = new ConsumerRecords(mp);
-
-
         when(kafkaConsumer.poll(100)).thenReturn(records);
         when(messageAndMetadata.message()).thenReturn(json);
 
@@ -108,26 +97,16 @@ public class KafkaConsumerTest {
 
     @Test
     public void testNextVersionMismatch() throws Exception {
+        Referenceable                        entity  = getEntity(TRAIT_NAME);
+        EntityUpdateRequest                  message = new 
EntityUpdateRequest("user1", entity);
+        String                               json    = 
AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new 
MessageVersion("2.0.0"), message));
+        TopicPartition                       tp      = new 
TopicPartition(ATLAS_HOOK_TOPIC,0);
+        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, 
"mykey", json));
+        Map                                  mp      = 
Collections.singletonMap(tp,klist);
+        ConsumerRecords                      records = new ConsumerRecords(mp);
 
         MessageAndMetadata<String, String> messageAndMetadata = 
mock(MessageAndMetadata.class);
 
-        Referenceable entity = getEntity(TRAIT_NAME);
-
-        HookNotification.EntityUpdateRequest message =
-            new HookNotification.EntityUpdateRequest("user1", entity);
-
-        String json = AbstractNotification.GSON.toJson(new 
AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
-
-        kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 
0)));
-        List<ConsumerRecord> klist = new ArrayList<>();
-        klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
-                0, 0L, "mykey", json));
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-        Map mp = new HashMap();
-        mp.put(tp,klist);
-        ConsumerRecords records = new ConsumerRecords(mp);
-
         when(kafkaConsumer.poll(100L)).thenReturn(records);
         when(messageAndMetadata.message()).thenReturn(json);
 
@@ -148,10 +127,8 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCommitIsCalledIfAutoCommitDisabled() {
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-
-        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, false, 100L);
+        TopicPartition     tp       = new TopicPartition(ATLAS_HOOK_TOPIC,0);
+        AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationType.HOOK.getDeserializer(), kafkaConsumer, 
false, 100L);
 
         consumer.commit(tp, 1);
 
@@ -160,10 +137,8 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCommitIsNotCalledIfAutoCommitEnabled() {
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-
-        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, true , 100L);
+        TopicPartition     tp       = new TopicPartition(ATLAS_HOOK_TOPIC,0);
+        AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationType.HOOK.getDeserializer(), kafkaConsumer, true 
, 100L);
 
         consumer.commit(tp, 1);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/806f983e/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 97fd095..c847f3d 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -20,7 +20,15 @@ package org.apache.atlas.notification;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import kafka.utils.ShutdownableThread;
-import org.apache.atlas.*;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasBaseClient;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
@@ -88,6 +96,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final String ATTRIBUTE_INPUTS         = "inputs";
 
     private static final String THREADNAME_PREFIX = 
NotificationHookConsumer.class.getSimpleName();
+    private static final String ATLAS_HOOK_TOPIC  = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
 
     public static final String CONSUMER_THREADS_PROPERTY         = 
"atlas.notification.hook.numthreads";
     public static final String CONSUMER_RETRIES_PROPERTY         = 
"atlas.notification.hook.maxretries";
@@ -101,6 +110,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
 
 
+
     private final AtlasEntityStore       atlasEntityStore;
     private final ServiceState           serviceState;
     private final AtlasInstanceConverter instanceConverter;
@@ -570,7 +580,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             try {
                 recordFailedMessages();
 
-                TopicPartition partition = new TopicPartition("ATLAS_HOOK", 
kafkaMessage.getPartition());
+                TopicPartition partition = new 
TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition());
 
                 consumer.commit(partition, kafkaMessage.getOffset() + 1);
                 commitSucceessStatus = true;

Reply via email to