Repository: atlas Updated Branches: refs/heads/branch-0.8 b933939df -> 1584139c2
ATLAS-2996: Conditionally Prevent Notification Processing. With support for HA mode. Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/1584139c Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/1584139c Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/1584139c Branch: refs/heads/branch-0.8 Commit: 1584139c24b0fce0e0ba62ca13922cd21f7bc8b5 Parents: b933939 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Tue Dec 18 13:13:49 2018 -0800 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Tue Dec 18 14:10:33 2018 -0800 ---------------------------------------------------------------------- .../atlas/notification/NotificationHookConsumer.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/1584139c/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 1b5872f..a26ab8b 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -166,8 +166,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void start() throws AtlasException { if (consumerDisabled) { - LOG.info("Hook consumer stopped. No hook messages will be processed. " + - "Set property '{}' to false to start consuming hook messages.", CONSUMER_DISABLED); + LOG.info("No hook messages will be processed. {} = {}", CONSUMER_DISABLED, consumerDisabled); return; } @@ -207,6 +206,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public void stop() { //Allow for completion of outstanding work try { + if (consumerDisabled) { + return; + } + stopConsumerThreads(); if (executors != null) { executors.shutdown(); @@ -242,6 +245,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl */ @Override public void instanceIsActive() { + if (consumerDisabled) { + return; + } + LOG.info("Reacting to active state: initializing Kafka consumers"); startConsumers(executors); } @@ -254,6 +261,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl */ @Override public void instanceIsPassive() { + if (consumerDisabled) { + return; + } + LOG.info("Reacting to passive state: shutting down Kafka consumers."); stop(); }