Repository: atlas Updated Branches: refs/heads/branch-1.0 44b2bfc66 -> 21b5ec402
ATLAS-2996: Conditionally Prevent Notification Processing Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/21b5ec40 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/21b5ec40 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/21b5ec40 Branch: refs/heads/branch-1.0 Commit: 21b5ec4023a54e8ea9e0c40d22a63814bf7a3883 Parents: 44b2bfc Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Mon Dec 17 14:31:23 2018 -0800 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Mon Dec 17 21:53:32 2018 -0800 ---------------------------------------------------------------------- .../atlas/notification/NotificationHookConsumer.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/21b5ec40/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 1cde3d0..b955948 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -104,13 +104,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; + public static final String CONSUMER_DISABLED = "atlas.notification.consumer.disabled"; public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633"; public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; - private final AtlasEntityStore atlasEntityStore; private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; @@ -121,6 +121,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final int maxWaitDuration; private final boolean skipHiveColumnLineageHive20633; private final int skipHiveColumnLineageHive20633InputsThreshold; + private final boolean consumerDisabled; private NotificationInterface notificationInterface; private ExecutorService executors; @@ -151,6 +152,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 + consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); @@ -158,6 +160,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void start() throws AtlasException { + if (consumerDisabled) { + LOG.info("Hook consumer stopped. No hook messages will be processed. " + + "Set property '{}' to false to start consuming hook messages.", CONSUMER_DISABLED); + return; + } + startInternal(applicationProperties, null); }