Repository: atlas
Updated Branches:
  refs/heads/branch-1.0 44b2bfc66 -> 21b5ec402


ATLAS-2996: Conditionally Prevent Notification Processing


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/21b5ec40
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/21b5ec40
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/21b5ec40

Branch: refs/heads/branch-1.0
Commit: 21b5ec4023a54e8ea9e0c40d22a63814bf7a3883
Parents: 44b2bfc
Author: Ashutosh Mestry <ames...@hortonworks.com>
Authored: Mon Dec 17 14:31:23 2018 -0800
Committer: Ashutosh Mestry <ames...@hortonworks.com>
Committed: Mon Dec 17 21:53:32 2018 -0800

----------------------------------------------------------------------
 .../atlas/notification/NotificationHookConsumer.java      | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/21b5ec40/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1cde3d0..b955948 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -104,13 +104,13 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String CONSUMER_RETRY_INTERVAL           = 
"atlas.notification.consumer.retry.interval";
     public static final String CONSUMER_MIN_RETRY_INTERVAL       = 
"atlas.notification.consumer.min.retry.interval";
     public static final String CONSUMER_MAX_RETRY_INTERVAL       = 
"atlas.notification.consumer.max.retry.interval";
+    public static final String CONSUMER_DISABLED                 = 
"atlas.notification.consumer.disabled";
 
     public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633    
              = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
     public static final String 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
 
-
     private final AtlasEntityStore       atlasEntityStore;
     private final ServiceState           serviceState;
     private final AtlasInstanceConverter instanceConverter;
@@ -121,6 +121,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final int                    maxWaitDuration;
     private final boolean                skipHiveColumnLineageHive20633;
     private final int                    
skipHiveColumnLineageHive20633InputsThreshold;
+    private final boolean                consumerDisabled;
 
     private NotificationInterface notificationInterface;
     private ExecutorService       executors;
@@ -151,6 +152,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
         skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
false);
         skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
+        consumerDisabled                                                       
  = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
         LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
@@ -158,6 +160,12 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
     @Override
     public void start() throws AtlasException {
+        if (consumerDisabled) {
+            LOG.info("Hook consumer stopped. No hook messages will be 
processed. " +
+                    "Set property '{}' to false to start consuming hook 
messages.", CONSUMER_DISABLED);
+            return;
+        }
+
         startInternal(applicationProperties, null);
     }
 

Reply via email to