Repository: atlas Updated Branches: refs/heads/branch-0.8 a48249e0b -> a35379c1d
ATLAS-3006: Option to ignore/prune metadata for temporary/staging Hive tables Signed-off-by: Sarath Subramanian <ssubraman...@hortonworks.com> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/a35379c1 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/a35379c1 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/a35379c1 Branch: refs/heads/branch-0.8 Commit: a35379c1deee6f11396e00916400ec9f475ee8a3 Parents: a48249e Author: Madhan Neethiraj <mad...@apache.org> Authored: Fri Jan 4 13:58:58 2019 -0800 Committer: Sarath Subramanian <ssubraman...@hortonworks.com> Committed: Fri Jan 4 13:58:58 2019 -0800 ---------------------------------------------------------------------- .../atlas/hive/hook/AtlasHiveHookContext.java | 5 + .../org/apache/atlas/hive/hook/HiveHook.java | 87 ++++++- .../hive/hook/events/AlterTableRename.java | 9 +- .../atlas/hive/hook/events/BaseHiveEvent.java | 119 +++++---- .../atlas/hive/hook/events/CreateTable.java | 2 +- .../notification/NotificationHookConsumer.java | 208 ++++++++++++--- .../preprocessor/EntityPreprocessor.java | 126 +++++++++ .../preprocessor/HivePreprocessor.java | 253 +++++++++++++++++++ .../preprocessor/PreprocessorContext.java | 229 +++++++++++++++++ 9 files changed, 943 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index 23cb853..249f48b 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -20,6 +20,7 @@ package org.apache.atlas.hive.hook; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache; +import org.apache.atlas.hive.hook.HiveHook.PreprocessAction; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.hive.metastore.api.Database; @@ -97,6 +98,10 @@ public class AtlasHiveHookContext { return hook.getSkipHiveColumnLineageHive20633InputsThreshold(); } + public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) { + return hook.getPreprocessActionForHiveTable(qualifiedName); + } + public String getQualifiedName(Database db) { return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 7b60553..4a6b417 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -21,6 +21,8 @@ package org.apache.atlas.hive.hook; import org.apache.atlas.hive.hook.events.*; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.utils.LruCache; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; @@ -30,12 +32,15 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME; import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB; @@ -45,6 +50,8 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE; public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); + public enum PreprocessAction { NONE, IGNORE, PRUNE } + public static final String CONF_PREFIX = "atlas.hook.hive."; public static final String CONF_CLUSTER_NAME = "atlas.cluster.name"; public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; @@ -54,6 +61,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds"; public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = CONF_PREFIX + "skip.hive_column_lineage.hive-20633"; public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold"; + public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN = CONF_PREFIX + "hive_table.ignore.pattern"; + public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN = CONF_PREFIX + "hive_table.prune.pattern"; + public static final String HOOK_HIVE_TABLE_CACHE_SIZE = CONF_PREFIX + "hive_table.cache.size"; public static final String DEFAULT_CLUSTER_NAME = "primary"; @@ -66,8 +76,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final int nameCacheTableMaxCount; private static final int nameCacheRebuildIntervalSeconds; - private static final boolean skipHiveColumnLineageHive20633; - private static final int skipHiveColumnLineageHive20633InputsThreshold; + private static final boolean skipHiveColumnLineageHive20633; + private static final int skipHiveColumnLineageHive20633InputsThreshold; + private static final List<Pattern> hiveTablesToIgnore = new ArrayList<>(); + private static final List<Pattern> hiveTablesToPrune = new ArrayList<>(); + private static final Map<String, PreprocessAction> hiveTablesCache; private static HiveHookObjectNamesCache knownObjects = null; @@ -85,6 +98,41 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true); skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 + String[] patternHiveTablesToIgnore = atlasProperties.getStringArray(HOOK_HIVE_TABLE_IGNORE_PATTERN); + String[] patternHiveTablesToPrune = atlasProperties.getStringArray(HOOK_HIVE_TABLE_PRUNE_PATTERN); + + if (patternHiveTablesToIgnore != null) { + for (String pattern : patternHiveTablesToIgnore) { + try { + hiveTablesToIgnore.add(Pattern.compile(pattern)); + + LOG.info("{}={}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern); + } catch (Throwable t) { + LOG.warn("failed to compile pattern {}", pattern, t); + LOG.warn("Ignoring invalid pattern in configuration {}: {}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern); + } + } + } + + if (patternHiveTablesToPrune != null) { + for (String pattern : patternHiveTablesToPrune) { + try { + hiveTablesToPrune.add(Pattern.compile(pattern)); + + LOG.info("{}={}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern); + } catch (Throwable t) { + LOG.warn("failed to compile pattern {}", pattern, t); + LOG.warn("Ignoring invalid pattern in configuration {}: {}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern); + } + } + } + + if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) { + hiveTablesCache = new LruCache<>(atlasProperties.getInt(HOOK_HIVE_TABLE_CACHE_SIZE, 10000), 0); + } else { + hiveTablesCache = Collections.emptyMap(); + } + knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null; } @@ -204,6 +252,41 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return skipHiveColumnLineageHive20633InputsThreshold; } + public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) { + PreprocessAction ret = PreprocessAction.NONE; + + if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) { + ret = hiveTablesCache.get(qualifiedName); + + if (ret == null) { + if (isMatch(qualifiedName, hiveTablesToIgnore)) { + ret = PreprocessAction.IGNORE; + } else if (isMatch(qualifiedName, hiveTablesToPrune)) { + ret = PreprocessAction.PRUNE; + } else { + ret = PreprocessAction.NONE; + } + + hiveTablesCache.put(qualifiedName, ret); + } + } + + return ret; + } + + private boolean isMatch(String name, List<Pattern> patterns) { + boolean ret = false; + + for (Pattern p : patterns) { + if (p.matcher(name).matches()) { + ret = true; + + break; + } + } + + return ret; + } public static class HiveHookObjectNamesCache { private final int dbMaxCacheCount; http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java index 6ced340..f4d7a82 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java @@ -85,13 +85,16 @@ public class AlterTableRename extends BaseHiveEvent { return ret; } - AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable); + AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable); + AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable); + + if (oldTableEntity == null || renamedTableEntity == null) { + return ret; + } // first update with oldTable info, so that the table will be created if it is not present in Atlas ret.add(new EntityUpdateRequestV2(getUserName(), new AtlasEntitiesWithExtInfo(oldTableEntity))); - AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable); - // update qualifiedName for all columns, partitionKeys, storageDesc String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME); http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index 2d90a15..5c52cf4 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -19,6 +19,7 @@ package org.apache.atlas.hive.hook.events; import org.apache.atlas.hive.hook.AtlasHiveHookContext; +import org.apache.atlas.hive.hook.HiveHook.PreprocessAction; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; @@ -284,7 +285,11 @@ public abstract class BaseHiveEvent { AtlasEntity entity = toTableEntity(table, ret); - ret.setEntity(entity); + if (entity != null) { + ret.setEntity(entity); + } else { + ret = null; + } return ret; } @@ -292,7 +297,9 @@ public abstract class BaseHiveEvent { protected AtlasEntity toTableEntity(Table table, AtlasEntitiesWithExtInfo entities) throws Exception { AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities); - entities.addEntity(ret); + if (ret != null) { + entities.addEntity(ret); + } return ret; } @@ -318,64 +325,76 @@ public abstract class BaseHiveEvent { AtlasEntity ret = context.getEntity(tblQualifiedName); if (ret == null) { - ret = new AtlasEntity(HIVE_TYPE_TABLE); + PreprocessAction action = context.getPreprocessActionForHiveTable(tblQualifiedName); - // if this table was sent in an earlier notification, set 'guid' to null - which will: - // - result in this entity to be not included in 'referredEntities' - // - cause Atlas server to resolve the entity by its qualifiedName - if (isKnownTable && !isAlterTableOperation()) { - ret.setGuid(null); - } + if (action == PreprocessAction.IGNORE) { + LOG.info("ignoring table {}", tblQualifiedName); + } else { + ret = new AtlasEntity(HIVE_TYPE_TABLE); - long createTime = getTableCreateTime(table); - long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime; - - ret.setAttribute(ATTRIBUTE_DB, dbId); - ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName); - ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase()); - ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner()); - ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime); - ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime); - ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention()); - ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters()); - ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT)); - ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name()); - ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary()); - - if (table.getViewOriginalText() != null) { - ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText()); - } + // if this table was sent in an earlier notification, set 'guid' to null - which will: + // - result in this entity to be not included in 'referredEntities' + // - cause Atlas server to resolve the entity by its qualifiedName + if (isKnownTable && !isAlterTableOperation()) { + ret.setGuid(null); + } - if (table.getViewExpandedText() != null) { - ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText()); - } + long createTime = getTableCreateTime(table); + long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime; + + ret.setAttribute(ATTRIBUTE_DB, dbId); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase()); + ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner()); + ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime); + ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime); + ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention()); + ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters()); + ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT)); + ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name()); + ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary()); + + if (table.getViewOriginalText() != null) { + ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText()); + } - AtlasObjectId tableId = getObjectId(ret); - AtlasEntity sd = getStorageDescEntity(tableId, table); - List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys()); - List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols()); + if (table.getViewExpandedText() != null) { + ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText()); + } - if (entityExtInfo != null) { - entityExtInfo.addReferredEntity(sd); + boolean pruneTable = table.isTemporary() || action == PreprocessAction.PRUNE; - if (partitionKeys != null) { - for (AtlasEntity partitionKey : partitionKeys) { - entityExtInfo.addReferredEntity(partitionKey); + if (pruneTable) { + LOG.info("ignoring details of table {}", tblQualifiedName); + } else { + AtlasObjectId tableId = getObjectId(ret); + AtlasEntity sd = getStorageDescEntity(tableId, table); + List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys()); + List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols()); + + if (entityExtInfo != null) { + entityExtInfo.addReferredEntity(sd); + + if (partitionKeys != null) { + for (AtlasEntity partitionKey : partitionKeys) { + entityExtInfo.addReferredEntity(partitionKey); + } + } + + if (columns != null) { + for (AtlasEntity column : columns) { + entityExtInfo.addReferredEntity(column); + } + } } - } - if (columns != null) { - for (AtlasEntity column : columns) { - entityExtInfo.addReferredEntity(column); - } + ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd)); + ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys)); + ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns)); } - } - ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd)); - ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys)); - ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns)); - - context.putEntity(tblQualifiedName, ret); + context.putEntity(tblQualifiedName, ret); + } } return ret; http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java index 2afaf9f..316222d 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java @@ -81,7 +81,7 @@ public class CreateTable extends BaseHiveEvent { if (table != null) { AtlasEntity tblEntity = toTableEntity(table, ret); - if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + if (tblEntity != null && TableType.EXTERNAL_TABLE.equals(table.getTableType())) { AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation()); AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity)); http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 2d2a6fb..dec6860 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -45,6 +45,9 @@ import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRe import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2; import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.notification.preprocessor.EntityPreprocessor; +import org.apache.atlas.notification.preprocessor.PreprocessorContext; +import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; @@ -54,9 +57,11 @@ import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.utils.LruCache; import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.service.ServiceState; import org.apache.atlas.web.util.DateTimeHelper; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.kafka.common.TopicPartition; @@ -68,16 +73,20 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.ListIterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; import static org.apache.atlas.AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE; import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY; @@ -113,19 +122,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633"; public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold"; - - - - private final AtlasEntityStore atlasEntityStore; - private final ServiceState serviceState; - private final AtlasInstanceConverter instanceConverter; - private final AtlasTypeRegistry typeRegistry; - private final int maxRetries; - private final int failedMsgCacheSize; - private final boolean skipHiveColumnLineageHive20633; - private final int skipHiveColumnLineageHive20633InputsThreshold; - private final int largeMessageProcessingTimeThresholdMs; - private final boolean consumerDisabled; + public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern"; + public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern"; + public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size"; + + private final AtlasEntityStore atlasEntityStore; + private final ServiceState serviceState; + private final AtlasInstanceConverter instanceConverter; + private final AtlasTypeRegistry typeRegistry; + private final int maxRetries; + private final int failedMsgCacheSize; + private final boolean skipHiveColumnLineageHive20633; + private final int skipHiveColumnLineageHive20633InputsThreshold; + private final int largeMessageProcessingTimeThresholdMs; + private final boolean consumerDisabled; + private final List<Pattern> hiveTablesToIgnore = new ArrayList<>(); + private final List<Pattern> hiveTablesToPrune = new ArrayList<>(); + private final Map<String, PreprocessAction> hiveTablesCache; + private final boolean preprocessEnabled; @VisibleForTesting final int consumerRetryInterval; @@ -144,10 +158,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ServiceState serviceState, AtlasInstanceConverter instanceConverter, AtlasTypeRegistry typeRegistry) throws AtlasException { this.notificationInterface = notificationInterface; - this.atlasEntityStore = atlasEntityStore; - this.serviceState = serviceState; - this.instanceConverter = instanceConverter; - this.typeRegistry = typeRegistry; + this.atlasEntityStore = atlasEntityStore; + this.serviceState = serviceState; + this.instanceConverter = instanceConverter; + this.typeRegistry = typeRegistry; this.applicationProperties = ApplicationProperties.get(); @@ -162,6 +176,43 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false); + String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN); + String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN); + + if (patternHiveTablesToIgnore != null) { + for (String pattern : patternHiveTablesToIgnore) { + try { + hiveTablesToIgnore.add(Pattern.compile(pattern)); + + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern); + } catch (Throwable t) { + LOG.warn("failed to compile pattern {}", pattern, t); + LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern); + } + } + } + + if (patternHiveTablesToPrune != null) { + for (String pattern : patternHiveTablesToPrune) { + try { + hiveTablesToPrune.add(Pattern.compile(pattern)); + + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern); + } catch (Throwable t) { + LOG.warn("failed to compile pattern {}", pattern, t); + LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern); + } + } + } + + if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) { + hiveTablesCache = new LruCache<>(applicationProperties.getInt(CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE, 10000), 0); + } else { + hiveTablesCache = Collections.emptyMap(); + } + + preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633; + LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); } @@ -406,6 +457,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl preProcessNotificationMessage(kafkaMsg); + if (isEmptyMessage(kafkaMsg)) { + commit(kafkaMsg); + return; + } + // Used for intermediate conversions during create and update for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { @@ -669,39 +725,86 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) { - skipHiveColumnLineage(kafkaMsg); - } - - private void skipHiveColumnLineage(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) { - if (!skipHiveColumnLineageHive20633) { + if (!preprocessEnabled) { return; } - final HookNotificationMessage message = kafkaMessage.getMessage(); - final AtlasEntitiesWithExtInfo entities; + PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache); - switch (message.getType()) { - case ENTITY_CREATE_V2: - entities = ((EntityCreateRequestV2) message).getEntities(); - break; + if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) { + ignoreOrPruneHiveTables(context); + } + + if (skipHiveColumnLineageHive20633) { + skipHiveColumnLineage(context); + } + } - case ENTITY_FULL_UPDATE_V2: - entities = ((EntityUpdateRequestV2) message).getEntities(); - break; + private void ignoreOrPruneHiveTables(PreprocessorContext context) { + List<AtlasEntity> entities = context.getEntities(); - default: - entities = null; - break; + if (entities != null) { + for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { + AtlasEntity entity = iter.next(); + EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName()); + + if (preprocessor != null) { + preprocessor.preprocess(entity, context); + + if (context.isIgnoredEntity(entity.getGuid())) { + iter.remove(); + } + } + } + + Map<String, AtlasEntity> referredEntities = context.getReferredEntities(); + + if (referredEntities != null) { + // scan referredEntities for pruning + for (Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator(); iter.hasNext(); ) { + AtlasEntity entity = iter.next().getValue(); + EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName()); + + if (preprocessor != null) { + preprocessor.preprocess(entity, context); + + if (context.isIgnoredEntity(entity.getGuid())) { + iter.remove(); + } + } + } + + for (String guid : context.getReferredEntitiesToMove()) { + AtlasEntity entity = referredEntities.remove(guid); + + if (entity != null) { + entities.add(entity); + + LOG.info("moved referred entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), context.getKafkaMessageOffset(), context.getKafkaPartition()); + } + } + } + + int ignoredEntities = context.getIgnoredEntities().size(); + int prunedEntities = context.getPrunedEntities().size(); + + if (ignoredEntities > 0 || prunedEntities > 0) { + LOG.info("preprocess: ignored entities={}; pruned entities={}. topic-offset={}, partition={}", ignoredEntities, prunedEntities, context.getKafkaMessageOffset(), context.getKafkaPartition()); + } } + } + + private void skipHiveColumnLineage(PreprocessorContext context) { + List<AtlasEntity> entities = context.getEntities(); - if (entities != null && entities.getEntities() != null) { + if (entities != null) { int lineageCount = 0; int lineageInputsCount = 0; int numRemovedEntities = 0; Set<String> lineageQNames = new HashSet<>(); // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases - for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { + for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next(); if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { @@ -713,7 +816,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (lineageQNames.contains(qualifiedName)) { iter.remove(); - LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, kafkaMessage.getOffset(), kafkaMessage.getPartition()); + LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition()); numRemovedEntities++; @@ -738,7 +841,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0; if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) { - for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { + for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next(); if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { @@ -750,9 +853,36 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (numRemovedEntities > 0) { - LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition()); + LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition()); + } + } + } + + private boolean isEmptyMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) { + final boolean ret; + final HookNotificationMessage message = kafkaMsg.getMessage(); + + switch (message.getType()) { + case ENTITY_CREATE_V2: { + AtlasEntitiesWithExtInfo entities = ((EntityCreateRequestV2) message).getEntities(); + + ret = entities == null || CollectionUtils.isEmpty(entities.getEntities()); } + break; + + case ENTITY_FULL_UPDATE_V2: { + AtlasEntitiesWithExtInfo entities = ((EntityUpdateRequestV2) message).getEntities(); + + ret = entities == null || CollectionUtils.isEmpty(entities.getEntities()); + } + break; + + default: + ret = false; + break; } + + return ret; } private void audit(String messageUser, String method, String path) { http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java new file mode 100644 index 0000000..bdea14a --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +public abstract class EntityPreprocessor { + public static final String TYPE_HIVE_COLUMN = "hive_column"; + public static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage"; + public static final String TYPE_HIVE_PROCESS = "hive_process"; + public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc"; + public static final String TYPE_HIVE_TABLE = "hive_table"; + + public static final String ATTRIBUTE_COLUMNS = "columns"; + public static final String ATTRIBUTE_INPUTS = "inputs"; + public static final String ATTRIBUTE_OUTPUTS = "outputs"; + public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys"; + public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + public static final String ATTRIBUTE_SD = "sd"; + + public static final char QNAME_SEP_CLUSTER_NAME = '@'; + public static final char QNAME_SEP_ENTITY_NAME = '.'; + public static final String QNAME_SD_SUFFIX = "_storage"; + + private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = new HashMap<>(); + + private final String typeName; + + + static { + EntityPreprocessor[] preprocessors = new EntityPreprocessor[] { + new HivePreprocessor.HiveTablePreprocessor(), + new HivePreprocessor.HiveColumnPreprocessor(), + new HivePreprocessor.HiveProcessPreprocessor(), + new HivePreprocessor.HiveColumnLineageProcessPreprocessor(), + new HivePreprocessor.HiveStorageDescPreprocessor() + }; + + for (EntityPreprocessor preprocessor : preprocessors) { + PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); + } + } + + protected EntityPreprocessor(String typeName) { + this.typeName = typeName; + } + + public String getTypeName() { + return typeName; + } + + public abstract void preprocess(AtlasEntity entity, PreprocessorContext context); + + + public static EntityPreprocessor getPreprocessor(String typeName) { + return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null; + } + + public static String getQualifiedName(AtlasEntity entity) { + Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null; + + return obj != null ? obj.toString() : null; + } + + public String getTypeName(Object obj) { + Object ret = null; + + if (obj instanceof AtlasObjectId) { + ret = ((AtlasObjectId) obj).getTypeName(); + } else if (obj instanceof Map) { + ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME); + } else if (obj instanceof AtlasEntity) { + ret = ((AtlasEntity) obj).getTypeName(); + } else if (obj instanceof AtlasEntityWithExtInfo) { + ret = ((AtlasEntityWithExtInfo) obj).getEntity().getTypeName(); + } + + return ret != null ? ret.toString() : null; + } + + public String getQualifiedName(Object obj) { + Map<String, Object> attributes = null; + + if (obj instanceof AtlasObjectId) { + attributes = ((AtlasObjectId) obj).getUniqueAttributes(); + } else if (obj instanceof Map) { + attributes = (Map) ((Map) obj).get(AtlasObjectId.KEY_UNIQUE_ATTRIBUTES); + } else if (obj instanceof AtlasEntity) { + attributes = ((AtlasEntity) obj).getAttributes(); + } else if (obj instanceof AtlasEntityWithExtInfo) { + attributes = ((AtlasEntityWithExtInfo) obj).getEntity().getAttributes(); + } + + Object ret = attributes != null ? attributes.get(ATTRIBUTE_QUALIFIED_NAME) : null; + + return ret != null ? ret.toString() : null; + } + + protected boolean isEmpty(Object obj) { + return obj == null || ((obj instanceof Collection) && ((Collection) obj).isEmpty()); + } +} + + http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java new file mode 100644 index 0000000..d54c88d --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; +import org.apache.commons.lang.StringUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class HivePreprocessor { + static class HiveTablePreprocessor extends EntityPreprocessor { + public HiveTablePreprocessor() { + super(TYPE_HIVE_TABLE); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (context.isIgnoredEntity(entity.getGuid())) { + context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName + } else { + PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(entity)); + + if (action == PreprocessAction.IGNORE) { + context.addToIgnoredEntities(entity); + + context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_SD)); + context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_COLUMNS)); + context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_PARTITION_KEYS)); + } else if (action == PreprocessAction.PRUNE) { + context.addToPrunedEntities(entity); + + context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_SD)); + context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_COLUMNS)); + context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_PARTITION_KEYS)); + + entity.setAttribute(ATTRIBUTE_SD, null); + entity.setAttribute(ATTRIBUTE_COLUMNS, null); + entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null); + } + } + } + } + + + static class HiveColumnPreprocessor extends EntityPreprocessor { + public HiveColumnPreprocessor() { + super(TYPE_HIVE_COLUMN); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (!context.isIgnoredEntity(entity.getGuid())) { + PreprocessAction action = context.getPreprocessActionForHiveTable(getHiveTableQualifiedName(getQualifiedName(entity))); + + if (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE) { + context.addToIgnoredEntities(entity.getGuid()); + } + } + } + + public static String getHiveTableQualifiedName(String columnQualifiedName) { + String dbTableName = null; + String clusterName = null; + + int sepPos = columnQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME); + + if (sepPos != -1 && columnQualifiedName.length() > (sepPos + 1)) { + clusterName = columnQualifiedName.substring(sepPos + 1); + } + + sepPos = columnQualifiedName.lastIndexOf(QNAME_SEP_ENTITY_NAME); + + if (sepPos != -1) { + dbTableName = columnQualifiedName.substring(0, sepPos); + } + + return clusterName != null ? (dbTableName + QNAME_SEP_CLUSTER_NAME + clusterName) : dbTableName; + } + } + + + static class HiveStorageDescPreprocessor extends EntityPreprocessor { + public HiveStorageDescPreprocessor() { + super(TYPE_HIVE_STORAGEDESC); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (!context.isIgnoredEntity(entity.getGuid())) { + PreprocessAction action = context.getPreprocessActionForHiveTable(getHiveTableQualifiedName(getQualifiedName(entity))); + + if (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE) { + context.addToIgnoredEntities(entity.getGuid()); + } + } + } + + public static String getHiveTableQualifiedName(String sdQualifiedName) { + int sepPos = sdQualifiedName.lastIndexOf(QNAME_SD_SUFFIX); + + return sepPos != -1 ? sdQualifiedName.substring(0, sepPos) : sdQualifiedName; + } + } + + + static class HiveProcessPreprocessor extends EntityPreprocessor { + public HiveProcessPreprocessor() { + super(TYPE_HIVE_PROCESS); + } + + public HiveProcessPreprocessor(String typeName) { + super(typeName); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (context.isIgnoredEntity(entity.getGuid())) { + context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName + } else { + Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS); + Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS); + + removeIgnoredObjectIds(inputs, context); + removeIgnoredObjectIds(outputs, context); + + boolean isInputsEmpty = isEmpty(inputs); + boolean isOutputsEmpty = isEmpty(outputs); + + if (isInputsEmpty || isOutputsEmpty) { + context.addToIgnoredEntities(entity); + + // since the process entity is ignored, entities referenced by inputs/outputs of this process entity + // may not be processed by Atlas, if they are present in referredEntities. So, move them from + // 'referredEntities' to 'entities'. However, this is not necessary for hive_column entities, + // as these entities would be referenced by hive_table entities + if (!StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { + if (!isInputsEmpty) { + for (Object obj : (Collection) inputs) { + String guid = context.getGuid(obj); + + context.addToReferredEntitiesToMove(guid); + } + } else if (!isOutputsEmpty) { + for (Object obj : (Collection) outputs) { + String guid = context.getGuid(obj); + + context.addToReferredEntitiesToMove(guid); + } + } + } + } + } + } + + private void removeIgnoredObjectIds(Object obj, PreprocessorContext context) { + if (obj == null || !(obj instanceof Collection)) { + return; + } + + Collection objList = (Collection) obj; + List<Object> toRemove = null; + + for (Object objElem : objList) { + boolean removeEntry = false; + String guid = context.getGuid(objElem); + + if (guid != null) { + removeEntry = context.isIgnoredEntity(guid); + + if (!removeEntry) { // perhaps entity hasn't been preprocessed yet + AtlasEntity entity = context.getEntity(guid); + + if (entity != null) { + switch (entity.getTypeName()) { + case TYPE_HIVE_TABLE: { + PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(entity)); + + removeEntry = (action == PreprocessAction.IGNORE); + } + break; + + case TYPE_HIVE_COLUMN: { + PreprocessAction action = context.getPreprocessActionForHiveTable(HiveColumnPreprocessor.getHiveTableQualifiedName(getQualifiedName(entity))); + + // if the table is ignored or pruned, remove the column + removeEntry = (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE); + } + break; + } + } + } + } else { + String typeName = getTypeName(objElem); + + if (typeName != null) { + switch (typeName) { + case TYPE_HIVE_TABLE: { + PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(objElem)); + + removeEntry = (action == PreprocessAction.IGNORE); + } + break; + + case TYPE_HIVE_COLUMN: { + PreprocessAction action = context.getPreprocessActionForHiveTable(HiveColumnPreprocessor.getHiveTableQualifiedName(getQualifiedName(objElem))); + + // if the table is ignored or pruned, remove the column + removeEntry = (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE); + } + break; + } + } + } + + if (removeEntry) { + if (toRemove == null) { + toRemove = new ArrayList(); + } + + toRemove.add(objElem); + } + } + + if (toRemove != null) { + objList.removeAll(toRemove); + } + } + } + + static class HiveColumnLineageProcessPreprocessor extends HiveProcessPreprocessor { + public HiveColumnLineageProcessPreprocessor() { + super(TYPE_HIVE_COLUMN_LINEAGE); + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java new file mode 100644 index 0000000..8f62768 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + + +public class PreprocessorContext { + private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class); + + public enum PreprocessAction { NONE, IGNORE, PRUNE } + + private final AtlasKafkaMessage<HookNotificationMessage> kafkaMessage; + private final AtlasEntitiesWithExtInfo entitiesWithExtInfo; + private final List<Pattern> hiveTablesToIgnore; + private final List<Pattern> hiveTablesToPrune; + private final Map<String, PreprocessAction> hiveTablesCache; + private final Set<String> ignoredEntities = new HashSet<>(); + private final Set<String> prunedEntities = new HashSet<>(); + private final Set<String> referredEntitiesToMove = new HashSet<>(); + + public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) { + this.kafkaMessage = kafkaMessage; + this.hiveTablesToIgnore = hiveTablesToIgnore; + this.hiveTablesToPrune = hiveTablesToPrune; + this.hiveTablesCache = hiveTablesCache; + + final HookNotificationMessage message = kafkaMessage.getMessage(); + + switch (message.getType()) { + case ENTITY_CREATE_V2: + entitiesWithExtInfo = ((HookNotification.EntityCreateRequestV2) message).getEntities(); + break; + + case ENTITY_FULL_UPDATE_V2: + entitiesWithExtInfo = ((HookNotification.EntityUpdateRequestV2) message).getEntities(); + break; + + default: + entitiesWithExtInfo = null; + break; + } + } + + public AtlasKafkaMessage<HookNotificationMessage> getKafkaMessage() { + return kafkaMessage; + } + + public long getKafkaMessageOffset() { + return kafkaMessage.getOffset(); + } + + public int getKafkaPartition() { + return kafkaMessage.getPartition(); + } + + public List<AtlasEntity> getEntities() { + return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null; + } + + public Map<String, AtlasEntity> getReferredEntities() { + return entitiesWithExtInfo != null ? entitiesWithExtInfo.getReferredEntities() : null; + } + + public AtlasEntity getEntity(String guid) { + return entitiesWithExtInfo != null && guid != null ? entitiesWithExtInfo.getEntity(guid) : null; + } + + public Set<String> getIgnoredEntities() { return ignoredEntities; } + + public Set<String> getPrunedEntities() { return prunedEntities; } + + public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; } + + public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) { + PreprocessAction ret = PreprocessAction.NONE; + + if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) { + ret = hiveTablesCache.get(qualifiedName); + + if (ret == null) { + if (isMatch(qualifiedName, hiveTablesToIgnore)) { + ret = PreprocessAction.IGNORE; + } else if (isMatch(qualifiedName, hiveTablesToPrune)) { + ret = PreprocessAction.PRUNE; + } else { + ret = PreprocessAction.NONE; + } + + hiveTablesCache.put(qualifiedName, ret); + } + } + + return ret; + } + + public boolean isIgnoredEntity(String guid) { + return guid != null ? ignoredEntities.contains(guid) : false; + } + + public boolean isPrunedEntity(String guid) { + return guid != null ? prunedEntities.contains(guid) : false; + } + + public void addToIgnoredEntities(AtlasEntity entity) { + if (!ignoredEntities.contains(entity.getGuid())) { + ignoredEntities.add(entity.getGuid()); + + LOG.info("ignored entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition()); + } + } + + public void addToPrunedEntities(AtlasEntity entity) { + if (!prunedEntities.contains(entity.getGuid())) { + prunedEntities.add(entity.getGuid()); + + LOG.info("pruned entity: typeName={}, qualifiedName={} topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition()); + } + } + + public void addToIgnoredEntities(String guid) { + if (guid != null) { + ignoredEntities.add(guid); + } + } + + public void addToPrunedEntities(String guid) { + if (guid != null) { + prunedEntities.add(guid); + } + } + + public void addToReferredEntitiesToMove(String guid) { + if (guid != null) { + referredEntitiesToMove.add(guid); + } + } + + public void addToIgnoredEntities(Object obj) { + collectGuids(obj, ignoredEntities); + } + + public void addToPrunedEntities(Object obj) { + collectGuids(obj, prunedEntities); + } + + public String getGuid(Object obj) { + Object ret = null; + + if (obj instanceof AtlasObjectId) { + ret = ((AtlasObjectId) obj).getGuid(); + } else if (obj instanceof Map) { + ret = ((Map) obj).get(AtlasObjectId.KEY_GUID); + } else if (obj instanceof AtlasEntity) { + ret = ((AtlasEntity) obj).getGuid(); + } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) { + ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid(); + } + + return ret != null ? ret.toString() : null; + } + + + private boolean isMatch(String name, List<Pattern> patterns) { + boolean ret = false; + + for (Pattern p : patterns) { + if (p.matcher(name).matches()) { + ret = true; + + break; + } + } + + return ret; + } + + private void collectGuids(Object obj, Set<String> guids) { + if (obj != null) { + if (obj instanceof Collection) { + Collection objList = (Collection) obj; + + for (Object objElem : objList) { + collectGuid(objElem, guids); + } + } else { + collectGuid(obj, guids); + } + } + } + + private void collectGuid(Object obj, Set<String> guids) { + String guid = getGuid(obj); + + if (guid != null) { + guids.add(guid); + } + } +}