Repository: atlas Updated Branches: refs/heads/branch-0.8 ea33708cf -> 20215f3dd
ATLAS-2891: updated hook notification processing with option to ignore potentially incorrect hive_column_lineage Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/20215f3d Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/20215f3d Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/20215f3d Branch: refs/heads/branch-0.8 Commit: 20215f3dd74b16fe4a7a9c8eb21b17925256f4f9 Parents: ea33708 Author: Madhan Neethiraj <[email protected]> Authored: Wed Sep 19 11:51:52 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Wed Sep 26 17:58:06 2018 -0700 ---------------------------------------------------------------------- .../atlas/hive/hook/AtlasHiveHookContext.java | 8 ++ .../org/apache/atlas/hive/hook/HiveHook.java | 15 +++ .../hive/hook/events/CreateHiveProcess.java | 26 +++- .../apache/atlas/kafka/AtlasKafkaConsumer.java | 9 +- .../notification/NotificationHookConsumer.java | 121 ++++++++++++++++--- 5 files changed, 162 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index b9e4256..b467f4c 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -85,6 +85,14 @@ public class AtlasHiveHookContext { return hook.getClusterName(); } + public boolean getSkipHiveColumnLineageHive20633() { + return hook.getSkipHiveColumnLineageHive20633(); + } + + public int getSkipHiveColumnLineageHive20633InputsThreshold() { + return hook.getSkipHiveColumnLineageHive20633InputsThreshold(); + } + public String getQualifiedName(Database db) { return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 19075f6..673fa3b 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -51,6 +51,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count"; public static final String HOOK_NAME_CACHE_TABLE_COUNT = CONF_PREFIX + "name.cache.table.count"; public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds"; + public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = CONF_PREFIX + "skip.hive_column_lineage.hive-20633"; + public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold"; public static final String DEFAULT_CLUSTER_NAME = "primary"; @@ -62,6 +64,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final int nameCacheTableMaxCount; private static final int nameCacheRebuildIntervalSeconds; + private static final boolean skipHiveColumnLineageHive20633; + private static final int skipHiveColumnLineageHive20633InputsThreshold; + private static HiveHookObjectNamesCache knownObjects = null; static { @@ -74,6 +79,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000); nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000); nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default + skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true); + skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null; } @@ -182,6 +189,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return clusterName; } + public boolean getSkipHiveColumnLineageHive20633() { + return skipHiveColumnLineageHive20633; + } + + public int getSkipHiveColumnLineageHive20633InputsThreshold() { + return skipHiveColumnLineageHive20633InputsThreshold; + } + public static class HiveHookObjectNamesCache { private final int dbMaxCacheCount; http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index c99a699..9b9852e 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -132,10 +132,18 @@ public class CreateHiveProcess extends BaseHiveEvent { return; } + final List<AtlasEntity> columnLineages = new ArrayList<>(); + boolean isSameInputsSize = true; + int lineageInputsSize = -1; + for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) { String outputColName = getQualifiedName(entry.getKey()); AtlasEntity outputColumn = context.getEntity(outputColName); + if (LOG.isDebugEnabled()) { + LOG.debug("processColumnLineage(): DependencyKey={}; Dependency={}", entry.getKey(), entry.getValue()); + } + if (outputColumn == null) { LOG.warn("column-lineage: non-existing output-column {}", outputColName); @@ -161,6 +169,12 @@ public class CreateHiveProcess extends BaseHiveEvent { continue; } + if (lineageInputsSize == -1) { + lineageInputsSize = inputColumns.size(); + } else if (lineageInputsSize != inputColumns.size()) { + isSameInputsSize = false; + } + AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE); columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); @@ -171,7 +185,17 @@ public class CreateHiveProcess extends BaseHiveEvent { columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType()); columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr()); - entities.addEntity(columnLineageProcess); + columnLineages.add(columnLineageProcess); + } + + boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && isSameInputsSize && lineageInputsSize > context.getSkipHiveColumnLineageHive20633InputsThreshold(); + + if (!skipColumnLineage) { + for (AtlasEntity columnLineage : columnLineages) { + entities.addEntity(columnLineage); + } + } else { + LOG.warn("skipping {} hive_column_lineage entities, each having {} inputs", columnLineages.size(), lineageInputsSize); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index e3bb71c..c15da15 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -69,7 +69,14 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { record.topic(), record.partition(), record.offset(), record.key(), record.value()); } - T message = deserializer.deserialize(record.value().toString()); + T message = null; + + try { + message = deserializer.deserialize(record.value().toString()); + } catch (OutOfMemoryError excp) { + LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}", + record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp); + } if (message == null) { continue; http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 1a567af..6b34aa6 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -24,6 +24,7 @@ import org.apache.atlas.*; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; @@ -49,6 +50,7 @@ import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.service.ServiceState; import org.apache.atlas.web.util.DateTimeHelper; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,9 +59,11 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.ListIterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -80,23 +84,31 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final String LOCALHOST = "localhost"; private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); + private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage"; + private static final String ATTRIBUTE_INPUTS = "inputs"; + private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); - public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; - public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; + public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; + public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize"; - public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; - public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; - public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; + public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; + public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; + public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; + public static final int SERVER_READY_WAIT_TIME_MS = 1000; + + public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633"; + public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold"; - public static final int SERVER_READY_WAIT_TIME_MS = 1000; - private final AtlasEntityStore atlasEntityStore; - private final ServiceState serviceState; + private final AtlasEntityStore atlasEntityStore; + private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; - private final AtlasTypeRegistry typeRegistry; - private final int maxRetries; - private final int failedMsgCacheSize; + private final AtlasTypeRegistry typeRegistry; + private final int maxRetries; + private final int failedMsgCacheSize; + private final boolean skipHiveColumnLineageHive20633; + private final int skipHiveColumnLineageHive20633InputsThreshold; @VisibleForTesting final int consumerRetryInterval; @@ -122,11 +134,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl this.applicationProperties = ApplicationProperties.get(); - maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); - failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); + maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); + failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1); consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); - minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default - maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default + minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default + maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default + + skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true); + skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default + + LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); + LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); } @Override @@ -350,6 +368,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return; } + preProcessNotificationMessage(kafkaMsg); + // Used for intermediate conversions during create and update for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { @@ -603,6 +623,77 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } + private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) { + skipHiveColumnLineage(kafkaMsg); + } + + private void skipHiveColumnLineage(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) { + if (!skipHiveColumnLineageHive20633) { + return; + } + + final HookNotificationMessage message = kafkaMessage.getMessage(); + final AtlasEntitiesWithExtInfo entities; + + switch (message.getType()) { + case ENTITY_CREATE_V2: + entities = ((EntityCreateRequestV2) message).getEntities(); + break; + + case ENTITY_FULL_UPDATE_V2: + entities = ((EntityUpdateRequestV2) message).getEntities(); + break; + + default: + entities = null; + break; + } + + if (entities != null && entities.getEntities() != null) { + boolean isSameInputsSize = true; + int lineageInputsSize = -1; + + // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases + for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { + AtlasEntity entity = iter.next(); + + if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { + Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS); + + if (objInputs instanceof Collection) { + Collection inputs = (Collection) objInputs; + + if (lineageInputsSize == -1) { // first entry + lineageInputsSize = inputs.size(); + } else if (inputs.size() != lineageInputsSize) { + isSameInputsSize = false; + + break; + } + } + } + } + + if (isSameInputsSize && lineageInputsSize > skipHiveColumnLineageHive20633InputsThreshold) { + int numRemovedEntities = 0; + + for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { + AtlasEntity entity = iter.next(); + + if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { + iter.remove(); + + numRemovedEntities++; + } + } + + if (numRemovedEntities > 0) { + LOG.warn("removed {} hive_column_lineage entities, each having {} inputs. offset={}, partition={}", numRemovedEntities, lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition()); + } + } + } + } + private void audit(String messageUser, String method, String path) { if (LOG.isDebugEnabled()) { LOG.debug("==> audit({},{}, {})", messageUser, method, path);
