Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 b933939df -> 1584139c2


ATLAS-2996: Conditionally Prevent Notification Processing. With support for HA 
mode.


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/1584139c
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/1584139c
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/1584139c

Branch: refs/heads/branch-0.8
Commit: 1584139c24b0fce0e0ba62ca13922cd21f7bc8b5
Parents: b933939
Author: Ashutosh Mestry <ames...@hortonworks.com>
Authored: Tue Dec 18 13:13:49 2018 -0800
Committer: Ashutosh Mestry <ames...@hortonworks.com>
Committed: Tue Dec 18 14:10:33 2018 -0800

----------------------------------------------------------------------
 .../atlas/notification/NotificationHookConsumer.java | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/1584139c/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1b5872f..a26ab8b 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -166,8 +166,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     @Override
     public void start() throws AtlasException {
         if (consumerDisabled) {
-            LOG.info("Hook consumer stopped. No hook messages will be 
processed. " +
-                    "Set property '{}' to false to start consuming hook 
messages.", CONSUMER_DISABLED);
+            LOG.info("No hook messages will be processed. {} = {}", 
CONSUMER_DISABLED, consumerDisabled);
             return;
         }
 
@@ -207,6 +206,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public void stop() {
         //Allow for completion of outstanding work
         try {
+            if (consumerDisabled) {
+                return;
+            }
+
             stopConsumerThreads();
             if (executors != null) {
                 executors.shutdown();
@@ -242,6 +245,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
      */
     @Override
     public void instanceIsActive() {
+        if (consumerDisabled) {
+            return;
+        }
+
         LOG.info("Reacting to active state: initializing Kafka consumers");
         startConsumers(executors);
     }
@@ -254,6 +261,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
      */
     @Override
     public void instanceIsPassive() {
+        if (consumerDisabled) {
+            return;
+        }
+
         LOG.info("Reacting to passive state: shutting down Kafka consumers.");
         stop();
     }

Reply via email to