Repository: atlas
Updated Branches:
  refs/heads/master 400385891 -> 1123f512c


ATLAS-3006: Option to ignore/prune metadata for temporary/staging Hive tables

Signed-off-by: Sarath Subramanian <ssubraman...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/1123f512
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/1123f512
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/1123f512

Branch: refs/heads/master
Commit: 1123f512cd4ba98f8e1cfecb7f9deff23cbb4949
Parents: 4003858
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Fri Jan 4 14:00:51 2019 -0800
Committer: Sarath Subramanian <ssubraman...@hortonworks.com>
Committed: Fri Jan 4 14:00:51 2019 -0800

----------------------------------------------------------------------
 .../atlas/hive/hook/AtlasHiveHookContext.java   |   5 +
 .../org/apache/atlas/hive/hook/HiveHook.java    |  88 ++++++-
 .../hive/hook/events/AlterTableRename.java      |   9 +-
 .../atlas/hive/hook/events/BaseHiveEvent.java   | 119 +++++----
 .../atlas/hive/hook/events/CreateTable.java     |  38 +--
 .../notification/NotificationHookConsumer.java  | 198 ++++++++++++---
 .../preprocessor/EntityPreprocessor.java        | 126 +++++++++
 .../preprocessor/HivePreprocessor.java          | 253 +++++++++++++++++++
 .../preprocessor/PreprocessorContext.java       | 228 +++++++++++++++++
 9 files changed, 957 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index 23cb853..44c6437 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.hive.hook;
 
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
 import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.RandomStringUtils;
@@ -97,6 +98,10 @@ public class AtlasHiveHookContext {
         return hook.getSkipHiveColumnLineageHive20633InputsThreshold();
     }
 
+    public PreprocessAction getPreprocessActionForHiveTable(String 
qualifiedName) {
+        return hook.getPreprocessActionForHiveTable(qualifiedName);
+    }
+
     public String getQualifiedName(Database db) {
         return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + 
getClusterName();
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 0f48578..ee02285 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -21,6 +21,8 @@ package org.apache.atlas.hive.hook;
 import org.apache.atlas.hive.hook.events.*;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.utils.LruCache;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
@@ -30,12 +32,15 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import static 
org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
@@ -45,6 +50,8 @@ import static 
org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
 public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
 
+    public enum PreprocessAction { NONE, IGNORE, PRUNE }
+
     public static final String CONF_PREFIX                         = 
"atlas.hook.hive.";
     public static final String CONF_CLUSTER_NAME                   = 
"atlas.cluster.name";
     public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE     = 
CONF_PREFIX + "hdfs_path.convert_to_lowercase";
@@ -54,6 +61,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = 
CONF_PREFIX + "name.cache.rebuild.interval.seconds";
     public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633        
          = CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
     public static final String 
HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + 
"skip.hive_column_lineage.hive-20633.inputs.threshold";
+    public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN                  
          = CONF_PREFIX + "hive_table.ignore.pattern";
+    public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN                   
          = CONF_PREFIX + "hive_table.prune.pattern";
+    public static final String HOOK_HIVE_TABLE_CACHE_SIZE                      
          = CONF_PREFIX + "hive_table.cache.size";
 
     public static final String DEFAULT_CLUSTER_NAME = "primary";
 
@@ -66,8 +76,11 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     private static final int     nameCacheTableMaxCount;
     private static final int     nameCacheRebuildIntervalSeconds;
 
-    private static final boolean skipHiveColumnLineageHive20633;
-    private static final int     skipHiveColumnLineageHive20633InputsThreshold;
+    private static final boolean                       
skipHiveColumnLineageHive20633;
+    private static final int                           
skipHiveColumnLineageHive20633InputsThreshold;
+    private static final List<Pattern>                 hiveTablesToIgnore = 
new ArrayList<>();
+    private static final List<Pattern>                 hiveTablesToPrune  = 
new ArrayList<>();
+    private static final Map<String, PreprocessAction> hiveTablesCache;
 
     private static HiveHookObjectNamesCache knownObjects = null;
 
@@ -85,6 +98,41 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         skipHiveColumnLineageHive20633                = 
atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
         skipHiveColumnLineageHive20633InputsThreshold = 
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
 
+        String[] patternHiveTablesToIgnore = 
atlasProperties.getStringArray(HOOK_HIVE_TABLE_IGNORE_PATTERN);
+        String[] patternHiveTablesToPrune  = 
atlasProperties.getStringArray(HOOK_HIVE_TABLE_PRUNE_PATTERN);
+
+        if (patternHiveTablesToIgnore != null) {
+            for (String pattern : patternHiveTablesToIgnore) {
+                try {
+                    hiveTablesToIgnore.add(Pattern.compile(pattern));
+
+                    LOG.info("{}={}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern);
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: 
{}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern);
+                }
+            }
+        }
+
+        if (patternHiveTablesToPrune != null) {
+            for (String pattern : patternHiveTablesToPrune) {
+                try {
+                    hiveTablesToPrune.add(Pattern.compile(pattern));
+
+                    LOG.info("{}={}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern);
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: 
{}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern);
+                }
+            }
+        }
+
+        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+            hiveTablesCache = new 
LruCache<>(atlasProperties.getInt(HOOK_HIVE_TABLE_CACHE_SIZE, 10000), 0);
+        } else {
+            hiveTablesCache = Collections.emptyMap();
+        }
+
         knownObjects = nameCacheEnabled ? new 
HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, 
nameCacheRebuildIntervalSeconds) : null;
     }
 
@@ -204,6 +252,42 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         return skipHiveColumnLineageHive20633InputsThreshold;
     }
 
+    public PreprocessAction getPreprocessActionForHiveTable(String 
qualifiedName) {
+        PreprocessAction ret = PreprocessAction.NONE;
+
+        if (qualifiedName != null && 
(CollectionUtils.isNotEmpty(hiveTablesToIgnore) || 
CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
+            ret = hiveTablesCache.get(qualifiedName);
+
+            if (ret == null) {
+                if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+                    ret = PreprocessAction.IGNORE;
+                } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+                    ret = PreprocessAction.PRUNE;
+                } else {
+                    ret = PreprocessAction.NONE;
+                }
+
+                hiveTablesCache.put(qualifiedName, ret);
+            }
+        }
+
+        return ret;
+    }
+
+    private boolean isMatch(String name, List<Pattern> patterns) {
+        boolean ret = false;
+
+        for (Pattern p : patterns) {
+            if (p.matcher(name).matches()) {
+                ret = true;
+
+                break;
+            }
+        }
+
+        return ret;
+    }
+
 
     public static class HiveHookObjectNamesCache {
         private final int         dbMaxCacheCount;

http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
index 35b0586..7e15d0e 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
@@ -85,13 +85,16 @@ public class AlterTableRename extends BaseHiveEvent {
             return ret;
         }
 
-        AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable);
+        AtlasEntityWithExtInfo oldTableEntity     = toTableEntity(oldTable);
+        AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
+
+        if (oldTableEntity == null || renamedTableEntity == null) {
+            return ret;
+        }
 
         // first update with oldTable info, so that the table will be created 
if it is not present in Atlas
         ret.add(new EntityUpdateRequestV2(getUserName(), new 
AtlasEntitiesWithExtInfo(oldTableEntity)));
 
-        AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
-
         // update qualifiedName for all columns, partitionKeys, storageDesc
         String renamedTableQualifiedName = (String) 
renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index e4537b4..31346d0 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.hive.hook.events;
 
 import org.apache.atlas.hive.hook.AtlasHiveHookContext;
+import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -307,7 +308,11 @@ public abstract class BaseHiveEvent {
 
         AtlasEntity entity = toTableEntity(table, ret);
 
-        ret.setEntity(entity);
+        if (entity != null) {
+            ret.setEntity(entity);
+        } else {
+            ret = null;
+        }
 
         return ret;
     }
@@ -315,7 +320,9 @@ public abstract class BaseHiveEvent {
     protected AtlasEntity toTableEntity(Table table, AtlasEntitiesWithExtInfo 
entities) throws Exception {
         AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities);
 
-        entities.addEntity(ret);
+        if (ret != null) {
+            entities.addEntity(ret);
+        }
 
         return ret;
     }
@@ -341,64 +348,76 @@ public abstract class BaseHiveEvent {
         AtlasEntity ret = context.getEntity(tblQualifiedName);
 
         if (ret == null) {
-            ret = new AtlasEntity(HIVE_TYPE_TABLE);
+            PreprocessAction action = 
context.getPreprocessActionForHiveTable(tblQualifiedName);
 
-            // if this table was sent in an earlier notification, set 'guid' 
to null - which will:
-            //  - result in this entity to be not included in 
'referredEntities'
-            //  - cause Atlas server to resolve the entity by its qualifiedName
-            if (isKnownTable && !isAlterTableOperation()) {
-                ret.setGuid(null);
-            }
+            if (action == PreprocessAction.IGNORE) {
+                LOG.info("ignoring table {}", tblQualifiedName);
+            } else {
+                ret = new AtlasEntity(HIVE_TYPE_TABLE);
 
-            long createTime     = getTableCreateTime(table);
-            long lastAccessTime = table.getLastAccessTime() > 0 ? 
(table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
-
-            ret.setAttribute(ATTRIBUTE_DB, dbId);
-            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
-            ret.setAttribute(ATTRIBUTE_NAME, 
table.getTableName().toLowerCase());
-            ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
-            ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
-            ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
-            ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention());
-            ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters());
-            ret.setAttribute(ATTRIBUTE_COMMENT, 
table.getParameters().get(ATTRIBUTE_COMMENT));
-            ret.setAttribute(ATTRIBUTE_TABLE_TYPE, 
table.getTableType().name());
-            ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary());
-
-            if (table.getViewOriginalText() != null) {
-                ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, 
table.getViewOriginalText());
-            }
+                // if this table was sent in an earlier notification, set 
'guid' to null - which will:
+                //  - result in this entity to be not included in 
'referredEntities'
+                //  - cause Atlas server to resolve the entity by its 
qualifiedName
+                if (isKnownTable && !isAlterTableOperation()) {
+                    ret.setGuid(null);
+                }
 
-            if (table.getViewExpandedText() != null) {
-                ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, 
table.getViewExpandedText());
-            }
+                long createTime     = getTableCreateTime(table);
+                long lastAccessTime = table.getLastAccessTime() > 0 ? 
(table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
+
+                ret.setAttribute(ATTRIBUTE_DB, dbId);
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
+                ret.setAttribute(ATTRIBUTE_NAME, 
table.getTableName().toLowerCase());
+                ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
+                ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+                ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+                ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention());
+                ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters());
+                ret.setAttribute(ATTRIBUTE_COMMENT, 
table.getParameters().get(ATTRIBUTE_COMMENT));
+                ret.setAttribute(ATTRIBUTE_TABLE_TYPE, 
table.getTableType().name());
+                ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary());
+
+                if (table.getViewOriginalText() != null) {
+                    ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, 
table.getViewOriginalText());
+                }
 
-            AtlasObjectId     tableId       = getObjectId(ret);
-            AtlasEntity       sd            = getStorageDescEntity(tableId, 
table);
-            List<AtlasEntity> partitionKeys = getColumnEntities(tableId, 
table, table.getPartitionKeys());
-            List<AtlasEntity> columns       = getColumnEntities(tableId, 
table, table.getCols());
+                if (table.getViewExpandedText() != null) {
+                    ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, 
table.getViewExpandedText());
+                }
 
-            if (entityExtInfo != null) {
-                entityExtInfo.addReferredEntity(sd);
+                boolean pruneTable = table.isTemporary() || action == 
PreprocessAction.PRUNE;
 
-                if (partitionKeys != null) {
-                    for (AtlasEntity partitionKey : partitionKeys) {
-                        entityExtInfo.addReferredEntity(partitionKey);
-                    }
-                }
+                if (pruneTable) {
+                    LOG.info("ignoring details of table {}", tblQualifiedName);
+                } else {
+                    AtlasObjectId     tableId       = getObjectId(ret);
+                    AtlasEntity       sd            = 
getStorageDescEntity(tableId, table);
+                    List<AtlasEntity> partitionKeys = 
getColumnEntities(tableId, table, table.getPartitionKeys());
+                    List<AtlasEntity> columns       = 
getColumnEntities(tableId, table, table.getCols());
+
+                    if (entityExtInfo != null) {
+                        entityExtInfo.addReferredEntity(sd);
+
+                        if (partitionKeys != null) {
+                            for (AtlasEntity partitionKey : partitionKeys) {
+                                entityExtInfo.addReferredEntity(partitionKey);
+                            }
+                        }
 
-                if (columns != null) {
-                    for (AtlasEntity column : columns) {
-                        entityExtInfo.addReferredEntity(column);
+                        if (columns != null) {
+                            for (AtlasEntity column : columns) {
+                                entityExtInfo.addReferredEntity(column);
+                            }
+                        }
                     }
-                }
-            }
 
-            ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
-            ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, 
getObjectIds(partitionKeys));
-            ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
+                    ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
+                    ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, 
getObjectIds(partitionKeys));
+                    ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
+                }
 
-            context.putEntity(tblQualifiedName, ret);
+                context.putEntity(tblQualifiedName, ret);
+            }
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
index 442a0a0..674a89f 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
@@ -81,28 +81,30 @@ public class CreateTable extends BaseHiveEvent {
         if (table != null) {
             AtlasEntity tblEntity = toTableEntity(table, ret);
 
-            if (isHBaseStore(table)) {
-                // This create lineage to HBase table in case of Hive on HBase
-                AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, 
ret);
+            if (tblEntity != null) {
+                if (isHBaseStore(table)) {
+                    // This create lineage to HBase table in case of Hive on 
HBase
+                    AtlasEntity hbaseTableEntity = 
toReferencedHBaseTable(table, ret);
 
-                if (hbaseTableEntity != null) {
-                    final AtlasEntity processEntity;
+                    if (hbaseTableEntity != null) {
+                        final AtlasEntity processEntity;
 
-                    if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) 
{
-                        processEntity = 
getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), 
Collections.singletonList(tblEntity));
-                    } else {
-                        processEntity = 
getHiveProcessEntity(Collections.singletonList(tblEntity), 
Collections.singletonList(hbaseTableEntity));
-                    }
+                        if 
(TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+                            processEntity = 
getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), 
Collections.singletonList(tblEntity));
+                        } else {
+                            processEntity = 
getHiveProcessEntity(Collections.singletonList(tblEntity), 
Collections.singletonList(hbaseTableEntity));
+                        }
 
-                    ret.addEntity(processEntity);
-                }
-            } else {
-                if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-                    AtlasEntity hdfsPathEntity = 
getPathEntity(table.getDataLocation(), ret);
-                    AtlasEntity processEntity  = 
getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), 
Collections.singletonList(tblEntity));
+                        ret.addEntity(processEntity);
+                    }
+                } else {
+                    if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) 
{
+                        AtlasEntity hdfsPathEntity = 
getPathEntity(table.getDataLocation(), ret);
+                        AtlasEntity processEntity  = 
getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), 
Collections.singletonList(tblEntity));
 
-                    ret.addEntity(processEntity);
-                    ret.addReferredEntity(hdfsPathEntity);
+                        ret.addEntity(processEntity);
+                        ret.addReferredEntity(hdfsPathEntity);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 003e5b0..bd54770 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -40,6 +40,10 @@ import 
org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV
 import 
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
 import 
org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
 import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
+import org.apache.atlas.notification.preprocessor.PreprocessorContext;
+import 
org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
+import org.apache.atlas.utils.LruCache;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
@@ -56,6 +60,7 @@ import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.filters.AuditFilter.AuditLog;
 import org.apache.atlas.web.service.ServiceState;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.common.TopicPartition;
@@ -70,13 +75,16 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
@@ -109,21 +117,28 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
     public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633    
              = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
     public static final String 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
+    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN   
              = 
"atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
+    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN    
              = 
"atlas.notification.consumer.preprocess.hive_table.prune.pattern";
+    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE       
              = "atlas.notification.consumer.preprocess.hive_table.cache.size";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
 
-    private final AtlasEntityStore       atlasEntityStore;
-    private final ServiceState           serviceState;
-    private final AtlasInstanceConverter instanceConverter;
-    private final AtlasTypeRegistry      typeRegistry;
-    private final int                    maxRetries;
-    private final int                    failedMsgCacheSize;
-    private final int                    minWaitDuration;
-    private final int                    maxWaitDuration;
-    private final boolean                skipHiveColumnLineageHive20633;
-    private final int                    
skipHiveColumnLineageHive20633InputsThreshold;
-    private final int                    largeMessageProcessingTimeThresholdMs;
-    private final boolean                consumerDisabled;
+    private final AtlasEntityStore              atlasEntityStore;
+    private final ServiceState                  serviceState;
+    private final AtlasInstanceConverter        instanceConverter;
+    private final AtlasTypeRegistry             typeRegistry;
+    private final int                           maxRetries;
+    private final int                           failedMsgCacheSize;
+    private final int                           minWaitDuration;
+    private final int                           maxWaitDuration;
+    private final boolean                       skipHiveColumnLineageHive20633;
+    private final int                           
skipHiveColumnLineageHive20633InputsThreshold;
+    private final int                           
largeMessageProcessingTimeThresholdMs;
+    private final boolean                       consumerDisabled;
+    private final List<Pattern>                 hiveTablesToIgnore = new 
ArrayList<>();
+    private final List<Pattern>                 hiveTablesToPrune  = new 
ArrayList<>();
+    private final Map<String, PreprocessAction> hiveTablesCache;
+    private final boolean                       preprocessEnabled;
 
     private NotificationInterface notificationInterface;
     private ExecutorService       executors;
@@ -157,6 +172,43 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         consumerDisabled                                                       
  = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
         largeMessageProcessingTimeThresholdMs         = 
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
 60 * 1000);  //  60 sec by default
 
+        String[] patternHiveTablesToIgnore = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
+        String[] patternHiveTablesToPrune  = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
+
+        if (patternHiveTablesToIgnore != null) {
+            for (String pattern : patternHiveTablesToIgnore) {
+                try {
+                    hiveTablesToIgnore.add(Pattern.compile(pattern));
+
+                    LOG.info("{}={}", 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern);
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: 
{}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern);
+                }
+            }
+        }
+
+        if (patternHiveTablesToPrune != null) {
+            for (String pattern : patternHiveTablesToPrune) {
+                try {
+                    hiveTablesToPrune.add(Pattern.compile(pattern));
+
+                    LOG.info("{}={}", 
CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern);
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: 
{}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern);
+                }
+            }
+        }
+
+        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+            hiveTablesCache = new 
LruCache<>(applicationProperties.getInt(CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE,
 10000), 0);
+        } else {
+            hiveTablesCache = Collections.emptyMap();
+        }
+
+        preprocessEnabled = !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633;
+
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
         LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
     }
@@ -417,6 +469,11 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
                 preProcessNotificationMessage(kafkaMsg);
 
+                if (isEmptyMessage(kafkaMsg)) {
+                    commit(kafkaMsg);
+                    return;
+                }
+
                 // Used for intermediate conversions during create and update
                 for (int numRetries = 0; numRetries < maxRetries; 
numRetries++) {
                     if (LOG.isDebugEnabled()) {
@@ -696,39 +753,85 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     }
 
     private void 
preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
-        skipHiveColumnLineage(kafkaMsg);
-    }
-
-    private void skipHiveColumnLineage(AtlasKafkaMessage<HookNotification> 
kafkaMessage) {
-        if (!skipHiveColumnLineageHive20633) {
+        if (!preprocessEnabled) {
             return;
         }
 
-        final HookNotification         message = kafkaMessage.getMessage();
-        final AtlasEntitiesWithExtInfo entities;
+        PreprocessorContext context = new PreprocessorContext(kafkaMsg, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache);
 
-        switch (message.getType()) {
-            case ENTITY_CREATE_V2:
-                entities = ((EntityCreateRequestV2) message).getEntities();
-            break;
+        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+            ignoreOrPruneHiveTables(context);
+        }
 
-            case ENTITY_FULL_UPDATE_V2:
-                entities = ((EntityUpdateRequestV2) message).getEntities();
-                break;
+        if (skipHiveColumnLineageHive20633) {
+            skipHiveColumnLineage(context);
+        }
+    }
 
-            default:
-                entities = null;
-            break;
+    private void ignoreOrPruneHiveTables(PreprocessorContext context) {
+        List<AtlasEntity> entities = context.getEntities();
+
+        if (entities != null) {
+            for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
+                AtlasEntity        entity       = iter.next();
+                EntityPreprocessor preprocessor = 
EntityPreprocessor.getPreprocessor(entity.getTypeName());
+
+                if (preprocessor != null) {
+                    preprocessor.preprocess(entity, context);
+
+                    if (context.isIgnoredEntity(entity.getGuid())) {
+                        iter.remove();
+                    }
+                }
+            }
+
+            Map<String, AtlasEntity> referredEntities = 
context.getReferredEntities();
+
+            if (referredEntities != null) {
+                for (Iterator<Map.Entry<String, AtlasEntity>> iter = 
referredEntities.entrySet().iterator(); iter.hasNext(); ) {
+                    AtlasEntity        entity       = iter.next().getValue();
+                    EntityPreprocessor preprocessor = 
EntityPreprocessor.getPreprocessor(entity.getTypeName());
+
+                    if (preprocessor != null) {
+                        preprocessor.preprocess(entity, context);
+
+                        if (context.isIgnoredEntity(entity.getGuid())) {
+                            iter.remove();
+                        }
+                    }
+                }
+
+                for (String guid : context.getReferredEntitiesToMove()) {
+                    AtlasEntity entity = referredEntities.remove(guid);
+
+                    if (entity != null) {
+                        entities.add(entity);
+
+                        LOG.info("moved referred entity: typeName={}, 
qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), 
EntityPreprocessor.getQualifiedName(entity), context.getKafkaMessageOffset(), 
context.getKafkaPartition());
+                    }
+                }
+            }
+
+            int ignoredEntities = context.getIgnoredEntities().size();
+            int prunedEntities  = context.getPrunedEntities().size();
+
+            if (ignoredEntities > 0 || prunedEntities > 0) {
+                LOG.info("preprocess: ignored entities={}; pruned entities={}. 
topic-offset={}, partition={}", ignoredEntities, prunedEntities, 
context.getKafkaMessageOffset(), context.getKafkaPartition());
+            }
         }
+    }
 
-        if (entities != null && entities.getEntities() != null) {
+    private void skipHiveColumnLineage(PreprocessorContext context) {
+        List<AtlasEntity> entities = context.getEntities();
+
+        if (entities != null) {
             int         lineageCount       = 0;
             int         lineageInputsCount = 0;
             int         numRemovedEntities = 0;
             Set<String> lineageQNames      = new HashSet<>();
 
             // find if all hive_column_lineage entities have same number of 
inputs, which is likely to be caused by HIVE-20633 that results in incorrect 
lineage in some cases
-            for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
+            for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
                 AtlasEntity entity = iter.next();
 
                 if (StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
@@ -740,7 +843,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                         if (lineageQNames.contains(qualifiedName)) {
                             iter.remove();
 
-                            LOG.warn("removed duplicate hive_column_lineage 
entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, 
lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+                            LOG.warn("removed duplicate hive_column_lineage 
entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, 
lineageInputsCount, context.getKafkaMessageOffset(), 
context.getKafkaPartition());
 
                             numRemovedEntities++;
 
@@ -765,7 +868,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             float avgInputsCount = lineageCount > 0 ? (((float) 
lineageInputsCount) / lineageCount) : 0;
 
             if (avgInputsCount > 
skipHiveColumnLineageHive20633InputsThreshold) {
-                for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
+                for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
                     AtlasEntity entity = iter.next();
 
                     if (StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
@@ -777,11 +880,38 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             }
 
             if (numRemovedEntities > 0) {
-                LOG.warn("removed {} hive_column_lineage entities. Average # 
of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, 
partition={}", numRemovedEntities, avgInputsCount, 
skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
+                LOG.warn("removed {} hive_column_lineage entities. Average # 
of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, 
partition={}", numRemovedEntities, avgInputsCount, 
skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, 
context.getKafkaMessageOffset(), context.getKafkaPartition());
             }
         }
     }
 
+    private boolean isEmptyMessage(AtlasKafkaMessage<HookNotification> 
kafkaMsg) {
+        final boolean          ret;
+        final HookNotification message = kafkaMsg.getMessage();
+
+        switch (message.getType()) {
+            case ENTITY_CREATE_V2: {
+                AtlasEntitiesWithExtInfo entities = ((EntityCreateRequestV2) 
message).getEntities();
+
+                ret = entities == null || 
CollectionUtils.isEmpty(entities.getEntities());
+            }
+            break;
+
+            case ENTITY_FULL_UPDATE_V2: {
+                AtlasEntitiesWithExtInfo entities = ((EntityUpdateRequestV2) 
message).getEntities();
+
+                ret = entities == null || 
CollectionUtils.isEmpty(entities.getEntities());
+            }
+            break;
+
+            default:
+                ret = false;
+                break;
+        }
+
+        return ret;
+    }
+
     static class FailedCommitOffsetRecorder {
         private Long currentOffset;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
new file mode 100644
index 0000000..bdea14a
--- /dev/null
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public abstract class EntityPreprocessor {
+    public static final String TYPE_HIVE_COLUMN         = "hive_column";
+    public static final String TYPE_HIVE_COLUMN_LINEAGE = 
"hive_column_lineage";
+    public static final String TYPE_HIVE_PROCESS        = "hive_process";
+    public static final String TYPE_HIVE_STORAGEDESC    = "hive_storagedesc";
+    public static final String TYPE_HIVE_TABLE          = "hive_table";
+
+    public static final String ATTRIBUTE_COLUMNS        = "columns";
+    public static final String ATTRIBUTE_INPUTS         = "inputs";
+    public static final String ATTRIBUTE_OUTPUTS        = "outputs";
+    public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
+    public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+    public static final String ATTRIBUTE_SD             = "sd";
+
+    public static final char   QNAME_SEP_CLUSTER_NAME = '@';
+    public static final char   QNAME_SEP_ENTITY_NAME  = '.';
+    public static final String QNAME_SD_SUFFIX        = "_storage";
+
+    private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = 
new HashMap<>();
+
+    private final String typeName;
+
+
+    static {
+        EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
+                                                                    new 
HivePreprocessor.HiveTablePreprocessor(),
+                                                                    new 
HivePreprocessor.HiveColumnPreprocessor(),
+                                                                    new 
HivePreprocessor.HiveProcessPreprocessor(),
+                                                                    new 
HivePreprocessor.HiveColumnLineageProcessPreprocessor(),
+                                                                    new 
HivePreprocessor.HiveStorageDescPreprocessor()
+        };
+
+        for (EntityPreprocessor preprocessor : preprocessors) {
+            PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
+        }
+    }
+
+    protected EntityPreprocessor(String typeName) {
+        this.typeName = typeName;
+    }
+
+    public String getTypeName() {
+        return typeName;
+    }
+
+    public abstract void preprocess(AtlasEntity entity, PreprocessorContext 
context);
+
+
+    public static EntityPreprocessor getPreprocessor(String typeName) {
+        return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null;
+    }
+
+    public static String getQualifiedName(AtlasEntity entity) {
+        Object obj = entity != null ? 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
+
+        return obj != null ? obj.toString() : null;
+    }
+
+    public String getTypeName(Object obj) {
+        Object ret = null;
+
+        if (obj instanceof AtlasObjectId) {
+            ret = ((AtlasObjectId) obj).getTypeName();
+        } else if (obj instanceof Map) {
+            ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
+        } else if (obj instanceof AtlasEntity) {
+            ret = ((AtlasEntity) obj).getTypeName();
+        } else if (obj instanceof AtlasEntityWithExtInfo) {
+            ret = ((AtlasEntityWithExtInfo) obj).getEntity().getTypeName();
+        }
+
+        return ret != null ? ret.toString() : null;
+    }
+
+    public String getQualifiedName(Object obj) {
+        Map<String, Object> attributes = null;
+
+        if (obj instanceof AtlasObjectId) {
+            attributes = ((AtlasObjectId) obj).getUniqueAttributes();
+        } else if (obj instanceof Map) {
+            attributes = (Map) ((Map) 
obj).get(AtlasObjectId.KEY_UNIQUE_ATTRIBUTES);
+        } else if (obj instanceof AtlasEntity) {
+            attributes = ((AtlasEntity) obj).getAttributes();
+        } else if (obj instanceof AtlasEntityWithExtInfo) {
+            attributes = ((AtlasEntityWithExtInfo) 
obj).getEntity().getAttributes();
+        }
+
+        Object ret = attributes != null ? 
attributes.get(ATTRIBUTE_QUALIFIED_NAME) : null;
+
+        return ret != null ? ret.toString() : null;
+    }
+
+    protected boolean isEmpty(Object obj) {
+        return obj == null || ((obj instanceof Collection) && ((Collection) 
obj).isEmpty());
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
new file mode 100644
index 0000000..d54c88d
--- /dev/null
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import 
org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class HivePreprocessor {
+    static class HiveTablePreprocessor extends EntityPreprocessor {
+        public HiveTablePreprocessor() {
+            super(TYPE_HIVE_TABLE);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            if (context.isIgnoredEntity(entity.getGuid())) {
+                context.addToIgnoredEntities(entity); // so that this will be 
logged with typeName and qualifiedName
+            } else {
+                PreprocessAction action = 
context.getPreprocessActionForHiveTable(getQualifiedName(entity));
+
+                if (action == PreprocessAction.IGNORE) {
+                    context.addToIgnoredEntities(entity);
+
+                    
context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_SD));
+                    
context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_COLUMNS));
+                    
context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_PARTITION_KEYS));
+                } else if (action == PreprocessAction.PRUNE) {
+                    context.addToPrunedEntities(entity);
+
+                    
context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_SD));
+                    
context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_COLUMNS));
+                    
context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_PARTITION_KEYS));
+
+                    entity.setAttribute(ATTRIBUTE_SD, null);
+                    entity.setAttribute(ATTRIBUTE_COLUMNS, null);
+                    entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
+                }
+            }
+        }
+    }
+
+
+    static class HiveColumnPreprocessor extends EntityPreprocessor {
+        public HiveColumnPreprocessor() {
+            super(TYPE_HIVE_COLUMN);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            if (!context.isIgnoredEntity(entity.getGuid())) {
+                PreprocessAction action = 
context.getPreprocessActionForHiveTable(getHiveTableQualifiedName(getQualifiedName(entity)));
+
+                if (action == PreprocessAction.IGNORE || action == 
PreprocessAction.PRUNE) {
+                    context.addToIgnoredEntities(entity.getGuid());
+                }
+            }
+        }
+
+        public static String getHiveTableQualifiedName(String 
columnQualifiedName) {
+            String dbTableName = null;
+            String clusterName = null;
+
+            int sepPos = 
columnQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME);
+
+            if (sepPos != -1 && columnQualifiedName.length() > (sepPos + 1)) {
+                clusterName = columnQualifiedName.substring(sepPos + 1);
+            }
+
+            sepPos = columnQualifiedName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+            if (sepPos != -1) {
+                dbTableName = columnQualifiedName.substring(0, sepPos);
+            }
+
+            return clusterName != null ? (dbTableName + QNAME_SEP_CLUSTER_NAME 
+ clusterName) : dbTableName;
+        }
+    }
+
+
+    static class HiveStorageDescPreprocessor extends EntityPreprocessor {
+        public HiveStorageDescPreprocessor() {
+            super(TYPE_HIVE_STORAGEDESC);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            if (!context.isIgnoredEntity(entity.getGuid())) {
+                PreprocessAction action = 
context.getPreprocessActionForHiveTable(getHiveTableQualifiedName(getQualifiedName(entity)));
+
+                if (action == PreprocessAction.IGNORE || action == 
PreprocessAction.PRUNE) {
+                    context.addToIgnoredEntities(entity.getGuid());
+                }
+            }
+        }
+
+        public static String getHiveTableQualifiedName(String sdQualifiedName) 
{
+            int sepPos = sdQualifiedName.lastIndexOf(QNAME_SD_SUFFIX);
+
+            return sepPos != -1 ? sdQualifiedName.substring(0, sepPos) : 
sdQualifiedName;
+        }
+    }
+
+
+    static class HiveProcessPreprocessor extends EntityPreprocessor {
+        public HiveProcessPreprocessor() {
+            super(TYPE_HIVE_PROCESS);
+        }
+
+        public HiveProcessPreprocessor(String typeName) {
+            super(typeName);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            if (context.isIgnoredEntity(entity.getGuid())) {
+                context.addToIgnoredEntities(entity); // so that this will be 
logged with typeName and qualifiedName
+            } else {
+                Object inputs  = entity.getAttribute(ATTRIBUTE_INPUTS);
+                Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+
+                removeIgnoredObjectIds(inputs, context);
+                removeIgnoredObjectIds(outputs, context);
+
+                boolean isInputsEmpty  = isEmpty(inputs);
+                boolean isOutputsEmpty = isEmpty(outputs);
+
+                if (isInputsEmpty || isOutputsEmpty) {
+                    context.addToIgnoredEntities(entity);
+
+                    // since the process entity is ignored, entities 
referenced by inputs/outputs of this process entity
+                    // may not be processed by Atlas, if they are present in 
referredEntities. So, move them from
+                    // 'referredEntities' to 'entities'. However, this is not 
necessary for hive_column entities,
+                    // as these entities would be referenced by hive_table 
entities
+                    if (!StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
+                        if (!isInputsEmpty) {
+                            for (Object obj : (Collection) inputs) {
+                                String guid = context.getGuid(obj);
+
+                                context.addToReferredEntitiesToMove(guid);
+                            }
+                        } else if (!isOutputsEmpty) {
+                            for (Object obj : (Collection) outputs) {
+                                String guid = context.getGuid(obj);
+
+                                context.addToReferredEntitiesToMove(guid);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        private void removeIgnoredObjectIds(Object obj, PreprocessorContext 
context) {
+            if (obj == null || !(obj instanceof Collection)) {
+                return;
+            }
+
+            Collection   objList  = (Collection) obj;
+            List<Object> toRemove = null;
+
+            for (Object objElem : objList) {
+                boolean removeEntry = false;
+                String  guid        = context.getGuid(objElem);
+
+                if (guid != null) {
+                    removeEntry = context.isIgnoredEntity(guid);
+
+                    if (!removeEntry) { // perhaps entity hasn't been 
preprocessed yet
+                        AtlasEntity entity = context.getEntity(guid);
+
+                        if (entity != null) {
+                            switch (entity.getTypeName()) {
+                                case TYPE_HIVE_TABLE: {
+                                    PreprocessAction action = 
context.getPreprocessActionForHiveTable(getQualifiedName(entity));
+
+                                    removeEntry = (action == 
PreprocessAction.IGNORE);
+                                }
+                                break;
+
+                                case TYPE_HIVE_COLUMN: {
+                                    PreprocessAction action = 
context.getPreprocessActionForHiveTable(HiveColumnPreprocessor.getHiveTableQualifiedName(getQualifiedName(entity)));
+
+                                    // if the table is ignored or pruned, 
remove the column
+                                    removeEntry = (action == 
PreprocessAction.IGNORE || action == PreprocessAction.PRUNE);
+                                }
+                                break;
+                            }
+                        }
+                    }
+                } else {
+                    String typeName = getTypeName(objElem);
+
+                    if (typeName != null) {
+                        switch (typeName) {
+                            case TYPE_HIVE_TABLE: {
+                                PreprocessAction action = 
context.getPreprocessActionForHiveTable(getQualifiedName(objElem));
+
+                                removeEntry = (action == 
PreprocessAction.IGNORE);
+                            }
+                            break;
+
+                            case TYPE_HIVE_COLUMN: {
+                                PreprocessAction action = 
context.getPreprocessActionForHiveTable(HiveColumnPreprocessor.getHiveTableQualifiedName(getQualifiedName(objElem)));
+
+                                // if the table is ignored or pruned, remove 
the column
+                                removeEntry = (action == 
PreprocessAction.IGNORE || action == PreprocessAction.PRUNE);
+                            }
+                            break;
+                        }
+                    }
+                }
+
+                if (removeEntry) {
+                    if (toRemove == null) {
+                        toRemove = new ArrayList();
+                    }
+
+                    toRemove.add(objElem);
+                }
+            }
+
+            if (toRemove != null) {
+                objList.removeAll(toRemove);
+            }
+        }
+    }
+
+    static class HiveColumnLineageProcessPreprocessor extends 
HiveProcessPreprocessor {
+        public HiveColumnLineageProcessPreprocessor() {
+            super(TYPE_HIVE_COLUMN_LINEAGE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/1123f512/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
new file mode 100644
index 0000000..7b4229b
--- /dev/null
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+
+public class PreprocessorContext {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PreprocessorContext.class);
+
+    public enum PreprocessAction { NONE, IGNORE, PRUNE }
+
+    private final AtlasKafkaMessage<HookNotification> kafkaMessage;
+    private final AtlasEntitiesWithExtInfo            entitiesWithExtInfo;
+    private final List<Pattern>                       hiveTablesToIgnore;
+    private final List<Pattern>                       hiveTablesToPrune;
+    private final Map<String, PreprocessAction>       hiveTablesCache;
+    private final Set<String>                         ignoredEntities        = 
new HashSet<>();
+    private final Set<String>                         prunedEntities         = 
new HashSet<>();
+    private final Set<String>                         referredEntitiesToMove = 
new HashSet<>();
+
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> 
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> 
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
+        this.kafkaMessage       = kafkaMessage;
+        this.hiveTablesToIgnore = hiveTablesToIgnore;
+        this.hiveTablesToPrune  = hiveTablesToPrune;
+        this.hiveTablesCache    = hiveTablesCache;
+
+        final HookNotification  message = kafkaMessage.getMessage();
+
+        switch (message.getType()) {
+            case ENTITY_CREATE_V2:
+                entitiesWithExtInfo = 
((HookNotification.EntityCreateRequestV2) message).getEntities();
+            break;
+
+            case ENTITY_FULL_UPDATE_V2:
+                entitiesWithExtInfo = 
((HookNotification.EntityUpdateRequestV2) message).getEntities();
+            break;
+
+            default:
+                entitiesWithExtInfo = null;
+            break;
+        }
+    }
+
+    public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
+        return kafkaMessage;
+    }
+
+    public long getKafkaMessageOffset() {
+        return kafkaMessage.getOffset();
+    }
+
+    public int getKafkaPartition() {
+        return kafkaMessage.getPartition();
+    }
+
+    public List<AtlasEntity> getEntities() {
+        return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() 
: null;
+    }
+
+    public Map<String, AtlasEntity> getReferredEntities() {
+        return entitiesWithExtInfo != null ? 
entitiesWithExtInfo.getReferredEntities() : null;
+    }
+
+    public AtlasEntity getEntity(String guid) {
+        return entitiesWithExtInfo != null && guid != null ? 
entitiesWithExtInfo.getEntity(guid) : null;
+    }
+
+    public Set<String> getIgnoredEntities() { return ignoredEntities; }
+
+    public Set<String> getPrunedEntities() { return prunedEntities; }
+
+    public Set<String> getReferredEntitiesToMove() { return 
referredEntitiesToMove; }
+
+    public PreprocessAction getPreprocessActionForHiveTable(String 
qualifiedName) {
+        PreprocessAction ret = PreprocessAction.NONE;
+
+        if (qualifiedName != null && 
(CollectionUtils.isNotEmpty(hiveTablesToIgnore) || 
CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
+            ret = hiveTablesCache.get(qualifiedName);
+
+            if (ret == null) {
+                if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+                    ret = PreprocessAction.IGNORE;
+                } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+                    ret = PreprocessAction.PRUNE;
+                } else {
+                    ret = PreprocessAction.NONE;
+                }
+
+                hiveTablesCache.put(qualifiedName, ret);
+            }
+        }
+
+        return ret;
+    }
+
+    public boolean isIgnoredEntity(String guid) {
+        return guid != null ? ignoredEntities.contains(guid) : false;
+    }
+
+    public boolean isPrunedEntity(String guid) {
+        return guid != null ? prunedEntities.contains(guid) : false;
+    }
+
+    public void addToIgnoredEntities(AtlasEntity entity) {
+        if (!ignoredEntities.contains(entity.getGuid())) {
+            ignoredEntities.add(entity.getGuid());
+
+            LOG.info("ignored entity: typeName={}, qualifiedName={}. 
topic-offset={}, partition={}", entity.getTypeName(), 
EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), 
getKafkaPartition());
+        }
+    }
+
+    public void addToPrunedEntities(AtlasEntity entity) {
+        if (!prunedEntities.contains(entity.getGuid())) {
+            prunedEntities.add(entity.getGuid());
+
+            LOG.info("pruned entity: typeName={}, qualifiedName={} 
topic-offset={}, partition={}", entity.getTypeName(), 
EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), 
getKafkaPartition());
+        }
+    }
+
+    public void addToIgnoredEntities(String guid) {
+        if (guid != null) {
+            ignoredEntities.add(guid);
+        }
+    }
+
+    public void addToPrunedEntities(String guid) {
+        if (guid != null) {
+            prunedEntities.add(guid);
+        }
+    }
+
+    public void addToReferredEntitiesToMove(String guid) {
+        if (guid != null) {
+            referredEntitiesToMove.add(guid);
+        }
+    }
+
+    public void addToIgnoredEntities(Object obj) {
+        collectGuids(obj, ignoredEntities);
+    }
+
+    public void addToPrunedEntities(Object obj) {
+        collectGuids(obj, prunedEntities);
+    }
+
+    public String getGuid(Object obj) {
+        Object ret = null;
+
+        if (obj instanceof AtlasObjectId) {
+            ret = ((AtlasObjectId) obj).getGuid();
+        } else if (obj instanceof Map) {
+            ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+        } else if (obj instanceof AtlasEntity) {
+            ret = ((AtlasEntity) obj).getGuid();
+        } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+            ret = ((AtlasEntity.AtlasEntityWithExtInfo) 
obj).getEntity().getGuid();
+        }
+
+        return ret != null ? ret.toString() : null;
+    }
+
+
+    private boolean isMatch(String name, List<Pattern> patterns) {
+        boolean ret = false;
+
+        for (Pattern p : patterns) {
+            if (p.matcher(name).matches()) {
+                ret = true;
+
+                break;
+            }
+        }
+
+        return ret;
+    }
+
+    private void collectGuids(Object obj, Set<String> guids) {
+        if (obj != null) {
+            if (obj instanceof Collection) {
+                Collection objList = (Collection) obj;
+
+                for (Object objElem : objList) {
+                    collectGuid(objElem, guids);
+                }
+            } else {
+                collectGuid(obj, guids);
+            }
+        }
+    }
+
+    private void collectGuid(Object obj, Set<String> guids) {
+        String guid = getGuid(obj);
+
+        if (guid != null) {
+            guids.add(guid);
+        }
+    }
+}

Reply via email to