This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 03334df  ATLAS-3333: updated notification pre-process with an option 
to ignore dummy Hive database/table
03334df is described below

commit 03334dfe56fe03fea8858f99140c2a8bc85ef2bc
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Fri Jul 12 00:56:49 2019 -0700

    ATLAS-3333: updated notification pre-process with an option to ignore dummy 
Hive database/table
    
    (cherry picked from commit 23eacbafcea2e58378271fa6dc7b56be08b7cac7)
    (cherry picked from commit 727b5bebf4cbd4bf52822f10b94127d0306aa94e)
---
 .../notification/NotificationHookConsumer.java     | 125 +++++++++++++--
 .../preprocessor/EntityPreprocessor.java           |  13 +-
 .../preprocessor/HivePreprocessor.java             |  36 ++++-
 .../preprocessor/PreprocessorContext.java          | 178 +++++++++++++++++++--
 4 files changed, 320 insertions(+), 32 deletions(-)

diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index dec6860..0a41c95 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -91,6 +91,7 @@ import java.util.regex.Pattern;
 import static org.apache.atlas.AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE;
 import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY;
 import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE;
+import static 
org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
@@ -108,6 +109,11 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final String ATTRIBUTE_INPUTS         = "inputs";
     private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
 
+    // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
+    public static final String DUMMY_DATABASE               = 
"_dummy_database";
+    public static final String DUMMY_TABLE                  = "_dummy_table";
+    public static final String VALUES_TMP_TABLE_NAME_PREFIX = 
"Values__Tmp__Table__";
+
     private static final String THREADNAME_PREFIX = 
NotificationHookConsumer.class.getSimpleName();
     private static final String ATLAS_HOOK_TOPIC  = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
 
@@ -125,6 +131,12 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN   
              = 
"atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN    
              = 
"atlas.notification.consumer.preprocess.hive_table.prune.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE       
              = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+    public static final String 
CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED              = 
"atlas.notification.consumer.preprocess.hive_db.ignore.dummy.enabled";
+    public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES  
              = 
"atlas.notification.consumer.preprocess.hive_db.ignore.dummy.names";
+    public static final String 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED           = 
"atlas.notification.consumer.preprocess.hive_table.ignore.dummy.enabled";
+    public static final String 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES             = 
"atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
+    public static final String 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED   = 
"atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
+    public static final String 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES           = 
"atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
 
     private final AtlasEntityStore              atlasEntityStore;
     private final ServiceState                  serviceState;
@@ -138,6 +150,9 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final boolean                       consumerDisabled;
     private final List<Pattern>                 hiveTablesToIgnore = new 
ArrayList<>();
     private final List<Pattern>                 hiveTablesToPrune  = new 
ArrayList<>();
+    private final List<String>                  hiveDummyDatabasesToIgnore;
+    private final List<String>                  hiveDummyTablesToIgnore;
+    private final List<String>                  hiveTablePrefixesToIgnore;
     private final Map<String, PreprocessAction> hiveTablesCache;
     private final boolean                       preprocessEnabled;
 
@@ -211,7 +226,45 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             hiveTablesCache = Collections.emptyMap();
         }
 
-        preprocessEnabled = !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633;
+        boolean hiveDbIgnoreDummyEnabled         = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED,
 true);
+        boolean hiveTableIgnoreDummyEnabled      = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED,
 true);
+        boolean hiveTableIgnoreNamePrefixEnabled = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED,
 true);
+
+        LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, 
hiveDbIgnoreDummyEnabled);
+        LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, 
hiveTableIgnoreDummyEnabled);
+        LOG.info("{}={}", 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, 
hiveTableIgnoreNamePrefixEnabled);
+
+        if (hiveDbIgnoreDummyEnabled) {
+            String[] dummyDatabaseNames = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES);
+
+            hiveDummyDatabasesToIgnore = trimAndPurge(dummyDatabaseNames, 
DUMMY_DATABASE);
+
+            LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES, 
StringUtils.join(hiveDummyDatabasesToIgnore, ','));
+        } else {
+            hiveDummyDatabasesToIgnore = Collections.emptyList();
+        }
+
+        if (hiveTableIgnoreDummyEnabled) {
+            String[] dummyTableNames = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES);
+
+            hiveDummyTablesToIgnore = trimAndPurge(dummyTableNames, 
DUMMY_TABLE);
+
+            LOG.info("{}={}", 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES, 
StringUtils.join(hiveDummyTablesToIgnore, ','));
+        } else {
+            hiveDummyTablesToIgnore = Collections.emptyList();
+        }
+
+        if (hiveTableIgnoreNamePrefixEnabled) {
+            String[] ignoreNamePrefixes = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES);
+
+            hiveTablePrefixesToIgnore = trimAndPurge(ignoreNamePrefixes, 
VALUES_TMP_TABLE_NAME_PREFIX);
+
+            LOG.info("{}={}", 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES, 
StringUtils.join(hiveTablePrefixesToIgnore, ','));
+        } else {
+            hiveTablePrefixesToIgnore = Collections.emptyList();
+        }
+
+        preprocessEnabled = skipHiveColumnLineageHive20633 || 
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || 
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || 
!hiveTablePrefixesToIgnore.isEmpty();
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
         LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
@@ -334,6 +387,26 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
     }
 
+    private List<String> trimAndPurge(String[] values, String defaultValue) {
+        final List<String> ret;
+
+        if (values != null && values.length > 0) {
+            ret = new ArrayList<>(values.length);
+
+            for (String val : values) {
+                if (StringUtils.isNotBlank(val)) {
+                    ret.add(val.trim());
+                }
+            }
+        } else if (StringUtils.isNotBlank(defaultValue)) {
+            ret = Collections.singletonList(defaultValue.trim());
+        } else {
+            ret = Collections.emptyList();
+        }
+
+        return ret;
+    }
+
     static class AdaptiveWaiter {
         private final long increment;
         private final long maxDuration;
@@ -724,23 +797,51 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
     }
 
-    private void 
preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage> 
kafkaMsg) {
-        if (!preprocessEnabled) {
-            return;
-        }
+    private PreprocessorContext 
preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage> 
kafkaMsg) {
+        PreprocessorContext context = null;
 
-        PreprocessorContext context = new PreprocessorContext(kafkaMsg, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache);
+        if (preprocessEnabled) {
+            context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, 
hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, 
hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore);
 
-        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
-            ignoreOrPruneHiveTables(context);
-        }
+            if (context.isHivePreprocessEnabled()) {
+                preprocessHiveTypes(context);
+            }
+
+            if (skipHiveColumnLineageHive20633) {
+                skipHiveColumnLineage(context);
+            }
 
-        if (skipHiveColumnLineageHive20633) {
-            skipHiveColumnLineage(context);
+
+            context.moveRegisteredReferredEntities();
+
+            if (context.isHivePreprocessEnabled() && 
CollectionUtils.isNotEmpty(context.getEntities())) {
+                // move hive_process and hive_column_lineage entities to end 
of the list
+                List<AtlasEntity> entities = context.getEntities();
+                int               count    = entities.size();
+
+                for (int i = 0; i < count; i++) {
+                    AtlasEntity entity = entities.get(i);
+
+                    switch (entity.getTypeName()) {
+                        case TYPE_HIVE_PROCESS:
+                        case TYPE_HIVE_COLUMN_LINEAGE:
+                            entities.remove(i--);
+                            entities.add(entity);
+                            count--;
+                            break;
+                    }
+                }
+
+                if (entities.size() - count > 0) {
+                    LOG.info("preprocess: moved {} 
hive_process/hive_column_lineage entities to end of list (listSize={}). 
topic-offset={}, partition={}", entities.size() - count, entities.size(), 
kafkaMsg.getOffset(), kafkaMsg.getPartition());
+                }
+            }
         }
+
+        return context;
     }
 
-    private void ignoreOrPruneHiveTables(PreprocessorContext context) {
+    private void preprocessHiveTypes(PreprocessorContext context) {
         List<AtlasEntity> entities = context.getEntities();
 
         if (entities != null) {
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index bdea14a..22bb127 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -31,10 +31,12 @@ public abstract class EntityPreprocessor {
     public static final String TYPE_HIVE_COLUMN_LINEAGE = 
"hive_column_lineage";
     public static final String TYPE_HIVE_PROCESS        = "hive_process";
     public static final String TYPE_HIVE_STORAGEDESC    = "hive_storagedesc";
+    public static final String TYPE_HIVE_DB             = "hive_db";
     public static final String TYPE_HIVE_TABLE          = "hive_table";
 
     public static final String ATTRIBUTE_COLUMNS        = "columns";
     public static final String ATTRIBUTE_INPUTS         = "inputs";
+    public static final String ATTRIBUTE_NAME           = "name";
     public static final String ATTRIBUTE_OUTPUTS        = "outputs";
     public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
     public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
@@ -50,7 +52,8 @@ public abstract class EntityPreprocessor {
 
 
     static {
-        EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
+        EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
+                                                                    new 
HivePreprocessor.HiveDbPreprocessor(),
                                                                     new 
HivePreprocessor.HiveTablePreprocessor(),
                                                                     new 
HivePreprocessor.HiveColumnPreprocessor(),
                                                                     new 
HivePreprocessor.HiveProcessPreprocessor(),
@@ -58,7 +61,7 @@ public abstract class EntityPreprocessor {
                                                                     new 
HivePreprocessor.HiveStorageDescPreprocessor()
         };
 
-        for (EntityPreprocessor preprocessor : preprocessors) {
+        for (EntityPreprocessor preprocessor : hivePreprocessors) {
             PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
         }
     }
@@ -84,6 +87,12 @@ public abstract class EntityPreprocessor {
         return obj != null ? obj.toString() : null;
     }
 
+    public static String getName(AtlasEntity entity) {
+        Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_NAME) : 
null;
+
+        return obj != null ? obj.toString() : null;
+    }
+
     public String getTypeName(Object obj) {
         Object ret = null;
 
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index d54c88d..1127af8 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -21,11 +21,34 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import 
org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
 import org.apache.commons.lang.StringUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
 public class HivePreprocessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HivePreprocessor.class);
+
+
+    static class HiveDbPreprocessor extends EntityPreprocessor {
+        public HiveDbPreprocessor() {
+            super(TYPE_HIVE_DB);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            if (!context.isIgnoredEntity(entity.getGuid())) {
+                PreprocessAction action = 
context.getPreprocessActionForHiveDb(getName(entity));
+
+                if (action == PreprocessAction.IGNORE) {
+                    context.addToIgnoredEntities(entity);
+                }
+            }
+        }
+    }
+
     static class HiveTablePreprocessor extends EntityPreprocessor {
         public HiveTablePreprocessor() {
             super(TYPE_HIVE_TABLE);
@@ -135,16 +158,19 @@ public class HivePreprocessor {
             if (context.isIgnoredEntity(entity.getGuid())) {
                 context.addToIgnoredEntities(entity); // so that this will be 
logged with typeName and qualifiedName
             } else {
-                Object inputs  = entity.getAttribute(ATTRIBUTE_INPUTS);
-                Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+                Object inputs       = entity.getAttribute(ATTRIBUTE_INPUTS);
+                Object outputs      = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+                int    inputsCount  = getCollectionSize(inputs);
+                int    outputsCount = getCollectionSize(outputs);
 
                 removeIgnoredObjectIds(inputs, context);
                 removeIgnoredObjectIds(outputs, context);
 
                 boolean isInputsEmpty  = isEmpty(inputs);
                 boolean isOutputsEmpty = isEmpty(outputs);
+                boolean isAnyRemoved   = inputsCount > 
getCollectionSize(inputs) || outputsCount > getCollectionSize(outputs);
 
-                if (isInputsEmpty || isOutputsEmpty) {
+                if (isAnyRemoved && (isInputsEmpty || isOutputsEmpty)) {
                     context.addToIgnoredEntities(entity);
 
                     // since the process entity is ignored, entities 
referenced by inputs/outputs of this process entity
@@ -170,6 +196,10 @@ public class HivePreprocessor {
             }
         }
 
+        private int getCollectionSize(Object obj) {
+            return (obj instanceof Collection) ? ((Collection) obj).size() : 0;
+        }
+
         private void removeIgnoredObjectIds(Object obj, PreprocessorContext 
context) {
             if (obj == null || !(obj instanceof Collection)) {
                 return;
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 8f62768..6e42655 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.notification.hook.HookNotification;
 import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +35,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static 
org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_CLUSTER_NAME;
+import static 
org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_ENTITY_NAME;
+
 
 public class PreprocessorContext {
     private static final Logger LOG = 
LoggerFactory.getLogger(PreprocessorContext.class);
@@ -45,15 +49,22 @@ public class PreprocessorContext {
     private final List<Pattern>                              
hiveTablesToIgnore;
     private final List<Pattern>                              hiveTablesToPrune;
     private final Map<String, PreprocessAction>              hiveTablesCache;
+    private final List<String>                               
hiveDummyDatabasesToIgnore;
+    private final List<String>                               
hiveDummyTablesToIgnore;
+    private final List<String>                               
hiveTablePrefixesToIgnore;
+    private final boolean                                    
isHivePreProcessEnabled;
     private final Set<String>                                ignoredEntities   
     = new HashSet<>();
     private final Set<String>                                prunedEntities    
     = new HashSet<>();
     private final Set<String>                                
referredEntitiesToMove = new HashSet<>();
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage> 
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> 
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
-        this.kafkaMessage       = kafkaMessage;
-        this.hiveTablesToIgnore = hiveTablesToIgnore;
-        this.hiveTablesToPrune  = hiveTablesToPrune;
-        this.hiveTablesCache    = hiveTablesCache;
+    public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage> 
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> 
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> 
hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> 
hiveTablePrefixesToIgnore) {
+        this.kafkaMessage                  = kafkaMessage;
+        this.hiveTablesToIgnore            = hiveTablesToIgnore;
+        this.hiveTablesToPrune             = hiveTablesToPrune;
+        this.hiveTablesCache               = hiveTablesCache;
+        this.hiveDummyDatabasesToIgnore    = hiveDummyDatabasesToIgnore;
+        this.hiveDummyTablesToIgnore       = hiveDummyTablesToIgnore;
+        this.hiveTablePrefixesToIgnore     = hiveTablePrefixesToIgnore;
 
         final HookNotificationMessage  message = kafkaMessage.getMessage();
 
@@ -70,6 +81,8 @@ public class PreprocessorContext {
                 entitiesWithExtInfo = null;
             break;
         }
+
+        this.isHivePreProcessEnabled = !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || 
!hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
     }
 
     public AtlasKafkaMessage<HookNotificationMessage> getKafkaMessage() {
@@ -84,6 +97,10 @@ public class PreprocessorContext {
         return kafkaMessage.getPartition();
     }
 
+    public boolean isHivePreprocessEnabled() {
+        return isHivePreProcessEnabled;
+    }
+
     public List<AtlasEntity> getEntities() {
         return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() 
: null;
     }
@@ -102,22 +119,78 @@ public class PreprocessorContext {
 
     public Set<String> getReferredEntitiesToMove() { return 
referredEntitiesToMove; }
 
+    public PreprocessAction getPreprocessActionForHiveDb(String dbName) {
+        PreprocessAction ret = PreprocessAction.NONE;
+
+        if (dbName != null) {
+            for (String dummyDbName : hiveDummyDatabasesToIgnore) {
+                if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) {
+                    ret = PreprocessAction.IGNORE;
+
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
     public PreprocessAction getPreprocessActionForHiveTable(String 
qualifiedName) {
         PreprocessAction ret = PreprocessAction.NONE;
 
-        if (qualifiedName != null && 
(CollectionUtils.isNotEmpty(hiveTablesToIgnore) || 
CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
-            ret = hiveTablesCache.get(qualifiedName);
+        if (qualifiedName != null) {
+            if (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || 
CollectionUtils.isNotEmpty(hiveTablesToPrune)) {
+                ret = hiveTablesCache.get(qualifiedName);
 
-            if (ret == null) {
-                if (isMatch(qualifiedName, hiveTablesToIgnore)) {
-                    ret = PreprocessAction.IGNORE;
-                } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
-                    ret = PreprocessAction.PRUNE;
-                } else {
-                    ret = PreprocessAction.NONE;
+                if (ret == null) {
+                    if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+                        ret = PreprocessAction.IGNORE;
+                    } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+                        ret = PreprocessAction.PRUNE;
+                    } else {
+                        ret = PreprocessAction.NONE;
+                    }
+
+                    hiveTablesCache.put(qualifiedName, ret);
                 }
+            }
+
+            if (ret != PreprocessAction.IGNORE && 
(CollectionUtils.isNotEmpty(hiveDummyTablesToIgnore) || 
CollectionUtils.isNotEmpty(hiveTablePrefixesToIgnore))) {
+                String tblName = 
getHiveTableNameFromQualifiedName(qualifiedName);
+
+                if (tblName != null) {
+                    for (String dummyTblName : hiveDummyTablesToIgnore) {
+                        if (StringUtils.equalsIgnoreCase(tblName, 
dummyTblName)) {
+                            ret = PreprocessAction.IGNORE;
+
+                            break;
+                        }
+                    }
+
+                    if (ret != PreprocessAction.IGNORE) {
+                        for (String tableNamePrefix : 
hiveTablePrefixesToIgnore) {
+                            if (StringUtils.startsWithIgnoreCase(tblName, 
tableNamePrefix)) {
+                                ret = PreprocessAction.IGNORE;
+
+                                break;
+                            }
+                        }
+                    }
+                }
+
+                if (ret != PreprocessAction.IGNORE && 
CollectionUtils.isNotEmpty(hiveDummyDatabasesToIgnore)) {
+                    String dbName = 
getHiveDbNameFromQualifiedName(qualifiedName);
+
+                    if (dbName != null) {
+                        for (String dummyDbName : hiveDummyDatabasesToIgnore) {
+                            if (StringUtils.equalsIgnoreCase(dbName, 
dummyDbName)) {
+                                ret = PreprocessAction.IGNORE;
 
-                hiveTablesCache.put(qualifiedName, ret);
+                                break;
+                            }
+                        }
+                    }
+                }
             }
         }
 
@@ -174,6 +247,81 @@ public class PreprocessorContext {
         collectGuids(obj, prunedEntities);
     }
 
+    public void moveRegisteredReferredEntities() {
+        List<AtlasEntity>        entities         = getEntities();
+        Map<String, AtlasEntity> referredEntities = getReferredEntities();
+
+        if (entities != null && referredEntities != null && 
!referredEntitiesToMove.isEmpty()) {
+            AtlasEntity firstEntity = entities.isEmpty() ? null : 
entities.get(0);
+
+            for (String guid : referredEntitiesToMove) {
+                AtlasEntity entity = referredEntities.remove(guid);
+
+                if (entity != null) {
+                    entities.add(entity);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("moved referred entity: typeName={}, 
qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), 
EntityPreprocessor.getQualifiedName(entity), kafkaMessage.getOffset(), 
kafkaMessage.getPartition());
+                    }
+                }
+            }
+
+            if (firstEntity != null) {
+                LOG.info("moved {} referred-entities to end of entities-list 
(firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", 
referredEntitiesToMove.size(), firstEntity.getTypeName(), 
EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), 
kafkaMessage.getPartition());
+            } else {
+                LOG.info("moved {} referred-entities to entities-list. 
topic-offset={}, partition={}", referredEntitiesToMove.size(), 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
+            }
+
+            referredEntitiesToMove.clear();
+        }
+    }
+
+    public String getHiveTableNameFromQualifiedName(String qualifiedName) {
+        String ret      = null;
+        int    idxStart = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME) + 1;
+
+        if (idxStart != 0 && qualifiedName.length() > idxStart) {
+            int idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME, 
idxStart);
+
+            if (idxEnd != -1) {
+                ret = qualifiedName.substring(idxStart, idxEnd);
+            }
+        }
+
+        return ret;
+    }
+
+    public String getHiveDbNameFromQualifiedName(String qualifiedName) {
+        String ret    = null;
+        int    idxEnd = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME); // 
db.table@cluster, db.table.column@cluster
+
+        if (idxEnd == -1) {
+            idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME); // 
db@cluster
+        }
+
+        if (idxEnd != -1) {
+            ret = qualifiedName.substring(0, idxEnd);
+        }
+
+        return ret;
+    }
+
+    public String getTypeName(Object obj) {
+        Object ret = null;
+
+        if (obj instanceof AtlasObjectId) {
+            ret = ((AtlasObjectId) obj).getTypeName();
+        } else if (obj instanceof Map) {
+            ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
+        } else if (obj instanceof AtlasEntity) {
+            ret = ((AtlasEntity) obj).getTypeName();
+        } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+            ret = ((AtlasEntity.AtlasEntityWithExtInfo) 
obj).getEntity().getTypeName();
+        }
+
+        return ret != null ? ret.toString() : null;
+    }
+
     public String getGuid(Object obj) {
         Object ret = null;
 

Reply via email to