This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 03334df ATLAS-3333: updated notification pre-process with an option
to ignore dummy Hive database/table
03334df is described below
commit 03334dfe56fe03fea8858f99140c2a8bc85ef2bc
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Fri Jul 12 00:56:49 2019 -0700
ATLAS-3333: updated notification pre-process with an option to ignore dummy
Hive database/table
(cherry picked from commit 23eacbafcea2e58378271fa6dc7b56be08b7cac7)
(cherry picked from commit 727b5bebf4cbd4bf52822f10b94127d0306aa94e)
---
.../notification/NotificationHookConsumer.java | 125 +++++++++++++--
.../preprocessor/EntityPreprocessor.java | 13 +-
.../preprocessor/HivePreprocessor.java | 36 ++++-
.../preprocessor/PreprocessorContext.java | 178 +++++++++++++++++++--
4 files changed, 320 insertions(+), 32 deletions(-)
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index dec6860..0a41c95 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -91,6 +91,7 @@ import java.util.regex.Pattern;
import static org.apache.atlas.AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE;
import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY;
import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE;
+import static
org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
/**
* Consumer of notifications from hooks e.g., hive hook etc.
@@ -108,6 +109,11 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
+ public static final String DUMMY_DATABASE =
"_dummy_database";
+ public static final String DUMMY_TABLE = "_dummy_table";
+ public static final String VALUES_TMP_TABLE_NAME_PREFIX =
"Values__Tmp__Table__";
+
private static final String THREADNAME_PREFIX =
NotificationHookConsumer.class.getSimpleName();
private static final String ATLAS_HOOK_TOPIC =
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
@@ -125,6 +131,12 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN
=
"atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN
=
"atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE
= "atlas.notification.consumer.preprocess.hive_table.cache.size";
+ public static final String
CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED =
"atlas.notification.consumer.preprocess.hive_db.ignore.dummy.enabled";
+ public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES
=
"atlas.notification.consumer.preprocess.hive_db.ignore.dummy.names";
+ public static final String
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED =
"atlas.notification.consumer.preprocess.hive_table.ignore.dummy.enabled";
+ public static final String
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES =
"atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
+ public static final String
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED =
"atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
+ public static final String
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES =
"atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
private final AtlasEntityStore atlasEntityStore;
private final ServiceState serviceState;
@@ -138,6 +150,9 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private final boolean consumerDisabled;
private final List<Pattern> hiveTablesToIgnore = new
ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new
ArrayList<>();
+ private final List<String> hiveDummyDatabasesToIgnore;
+ private final List<String> hiveDummyTablesToIgnore;
+ private final List<String> hiveTablePrefixesToIgnore;
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean preprocessEnabled;
@@ -211,7 +226,45 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
hiveTablesCache = Collections.emptyMap();
}
- preprocessEnabled = !hiveTablesToIgnore.isEmpty() ||
!hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633;
+ boolean hiveDbIgnoreDummyEnabled =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED,
true);
+ boolean hiveTableIgnoreDummyEnabled =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED,
true);
+ boolean hiveTableIgnoreNamePrefixEnabled =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED,
true);
+
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED,
hiveDbIgnoreDummyEnabled);
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED,
hiveTableIgnoreDummyEnabled);
+ LOG.info("{}={}",
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED,
hiveTableIgnoreNamePrefixEnabled);
+
+ if (hiveDbIgnoreDummyEnabled) {
+ String[] dummyDatabaseNames =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES);
+
+ hiveDummyDatabasesToIgnore = trimAndPurge(dummyDatabaseNames,
DUMMY_DATABASE);
+
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES,
StringUtils.join(hiveDummyDatabasesToIgnore, ','));
+ } else {
+ hiveDummyDatabasesToIgnore = Collections.emptyList();
+ }
+
+ if (hiveTableIgnoreDummyEnabled) {
+ String[] dummyTableNames =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES);
+
+ hiveDummyTablesToIgnore = trimAndPurge(dummyTableNames,
DUMMY_TABLE);
+
+ LOG.info("{}={}",
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES,
StringUtils.join(hiveDummyTablesToIgnore, ','));
+ } else {
+ hiveDummyTablesToIgnore = Collections.emptyList();
+ }
+
+ if (hiveTableIgnoreNamePrefixEnabled) {
+ String[] ignoreNamePrefixes =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES);
+
+ hiveTablePrefixesToIgnore = trimAndPurge(ignoreNamePrefixes,
VALUES_TMP_TABLE_NAME_PREFIX);
+
+ LOG.info("{}={}",
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES,
StringUtils.join(hiveTablePrefixesToIgnore, ','));
+ } else {
+ hiveTablePrefixesToIgnore = Collections.emptyList();
+ }
+
+ preprocessEnabled = skipHiveColumnLineageHive20633 ||
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() ||
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() ||
!hiveTablePrefixesToIgnore.isEmpty();
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633,
skipHiveColumnLineageHive20633);
LOG.info("{}={}",
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
skipHiveColumnLineageHive20633InputsThreshold);
@@ -334,6 +387,26 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
}
}
+ private List<String> trimAndPurge(String[] values, String defaultValue) {
+ final List<String> ret;
+
+ if (values != null && values.length > 0) {
+ ret = new ArrayList<>(values.length);
+
+ for (String val : values) {
+ if (StringUtils.isNotBlank(val)) {
+ ret.add(val.trim());
+ }
+ }
+ } else if (StringUtils.isNotBlank(defaultValue)) {
+ ret = Collections.singletonList(defaultValue.trim());
+ } else {
+ ret = Collections.emptyList();
+ }
+
+ return ret;
+ }
+
static class AdaptiveWaiter {
private final long increment;
private final long maxDuration;
@@ -724,23 +797,51 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
}
}
- private void
preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage>
kafkaMsg) {
- if (!preprocessEnabled) {
- return;
- }
+ private PreprocessorContext
preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage>
kafkaMsg) {
+ PreprocessorContext context = null;
- PreprocessorContext context = new PreprocessorContext(kafkaMsg,
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache);
+ if (preprocessEnabled) {
+ context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore,
hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore,
hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore);
- if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
- ignoreOrPruneHiveTables(context);
- }
+ if (context.isHivePreprocessEnabled()) {
+ preprocessHiveTypes(context);
+ }
+
+ if (skipHiveColumnLineageHive20633) {
+ skipHiveColumnLineage(context);
+ }
- if (skipHiveColumnLineageHive20633) {
- skipHiveColumnLineage(context);
+
+ context.moveRegisteredReferredEntities();
+
+ if (context.isHivePreprocessEnabled() &&
CollectionUtils.isNotEmpty(context.getEntities())) {
+ // move hive_process and hive_column_lineage entities to end
of the list
+ List<AtlasEntity> entities = context.getEntities();
+ int count = entities.size();
+
+ for (int i = 0; i < count; i++) {
+ AtlasEntity entity = entities.get(i);
+
+ switch (entity.getTypeName()) {
+ case TYPE_HIVE_PROCESS:
+ case TYPE_HIVE_COLUMN_LINEAGE:
+ entities.remove(i--);
+ entities.add(entity);
+ count--;
+ break;
+ }
+ }
+
+ if (entities.size() - count > 0) {
+ LOG.info("preprocess: moved {}
hive_process/hive_column_lineage entities to end of list (listSize={}).
topic-offset={}, partition={}", entities.size() - count, entities.size(),
kafkaMsg.getOffset(), kafkaMsg.getPartition());
+ }
+ }
}
+
+ return context;
}
- private void ignoreOrPruneHiveTables(PreprocessorContext context) {
+ private void preprocessHiveTypes(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index bdea14a..22bb127 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -31,10 +31,12 @@ public abstract class EntityPreprocessor {
public static final String TYPE_HIVE_COLUMN_LINEAGE =
"hive_column_lineage";
public static final String TYPE_HIVE_PROCESS = "hive_process";
public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc";
+ public static final String TYPE_HIVE_DB = "hive_db";
public static final String TYPE_HIVE_TABLE = "hive_table";
public static final String ATTRIBUTE_COLUMNS = "columns";
public static final String ATTRIBUTE_INPUTS = "inputs";
+ public static final String ATTRIBUTE_NAME = "name";
public static final String ATTRIBUTE_OUTPUTS = "outputs";
public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
@@ -50,7 +52,8 @@ public abstract class EntityPreprocessor {
static {
- EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
+ EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
+ new
HivePreprocessor.HiveDbPreprocessor(),
new
HivePreprocessor.HiveTablePreprocessor(),
new
HivePreprocessor.HiveColumnPreprocessor(),
new
HivePreprocessor.HiveProcessPreprocessor(),
@@ -58,7 +61,7 @@ public abstract class EntityPreprocessor {
new
HivePreprocessor.HiveStorageDescPreprocessor()
};
- for (EntityPreprocessor preprocessor : preprocessors) {
+ for (EntityPreprocessor preprocessor : hivePreprocessors) {
PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
}
}
@@ -84,6 +87,12 @@ public abstract class EntityPreprocessor {
return obj != null ? obj.toString() : null;
}
+ public static String getName(AtlasEntity entity) {
+ Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_NAME) :
null;
+
+ return obj != null ? obj.toString() : null;
+ }
+
public String getTypeName(Object obj) {
Object ret = null;
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index d54c88d..1127af8 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -21,11 +21,34 @@ import org.apache.atlas.model.instance.AtlasEntity;
import
org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class HivePreprocessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(HivePreprocessor.class);
+
+
+ static class HiveDbPreprocessor extends EntityPreprocessor {
+ public HiveDbPreprocessor() {
+ super(TYPE_HIVE_DB);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext
context) {
+ if (!context.isIgnoredEntity(entity.getGuid())) {
+ PreprocessAction action =
context.getPreprocessActionForHiveDb(getName(entity));
+
+ if (action == PreprocessAction.IGNORE) {
+ context.addToIgnoredEntities(entity);
+ }
+ }
+ }
+ }
+
static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() {
super(TYPE_HIVE_TABLE);
@@ -135,16 +158,19 @@ public class HivePreprocessor {
if (context.isIgnoredEntity(entity.getGuid())) {
context.addToIgnoredEntities(entity); // so that this will be
logged with typeName and qualifiedName
} else {
- Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
- Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+ Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
+ Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+ int inputsCount = getCollectionSize(inputs);
+ int outputsCount = getCollectionSize(outputs);
removeIgnoredObjectIds(inputs, context);
removeIgnoredObjectIds(outputs, context);
boolean isInputsEmpty = isEmpty(inputs);
boolean isOutputsEmpty = isEmpty(outputs);
+ boolean isAnyRemoved = inputsCount >
getCollectionSize(inputs) || outputsCount > getCollectionSize(outputs);
- if (isInputsEmpty || isOutputsEmpty) {
+ if (isAnyRemoved && (isInputsEmpty || isOutputsEmpty)) {
context.addToIgnoredEntities(entity);
// since the process entity is ignored, entities
referenced by inputs/outputs of this process entity
@@ -170,6 +196,10 @@ public class HivePreprocessor {
}
}
+ private int getCollectionSize(Object obj) {
+ return (obj instanceof Collection) ? ((Collection) obj).size() : 0;
+ }
+
private void removeIgnoredObjectIds(Object obj, PreprocessorContext
context) {
if (obj == null || !(obj instanceof Collection)) {
return;
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 8f62768..6e42655 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.notification.hook.HookNotification;
import
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +35,9 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+import static
org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_CLUSTER_NAME;
+import static
org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_ENTITY_NAME;
+
public class PreprocessorContext {
private static final Logger LOG =
LoggerFactory.getLogger(PreprocessorContext.class);
@@ -45,15 +49,22 @@ public class PreprocessorContext {
private final List<Pattern>
hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
+ private final List<String>
hiveDummyDatabasesToIgnore;
+ private final List<String>
hiveDummyTablesToIgnore;
+ private final List<String>
hiveTablePrefixesToIgnore;
+ private final boolean
isHivePreProcessEnabled;
private final Set<String> ignoredEntities
= new HashSet<>();
private final Set<String> prunedEntities
= new HashSet<>();
private final Set<String>
referredEntitiesToMove = new HashSet<>();
- public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage>
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern>
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
- this.kafkaMessage = kafkaMessage;
- this.hiveTablesToIgnore = hiveTablesToIgnore;
- this.hiveTablesToPrune = hiveTablesToPrune;
- this.hiveTablesCache = hiveTablesCache;
+ public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage>
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern>
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String>
hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String>
hiveTablePrefixesToIgnore) {
+ this.kafkaMessage = kafkaMessage;
+ this.hiveTablesToIgnore = hiveTablesToIgnore;
+ this.hiveTablesToPrune = hiveTablesToPrune;
+ this.hiveTablesCache = hiveTablesCache;
+ this.hiveDummyDatabasesToIgnore = hiveDummyDatabasesToIgnore;
+ this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore;
+ this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore;
final HookNotificationMessage message = kafkaMessage.getMessage();
@@ -70,6 +81,8 @@ public class PreprocessorContext {
entitiesWithExtInfo = null;
break;
}
+
+ this.isHivePreProcessEnabled = !hiveTablesToIgnore.isEmpty() ||
!hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() ||
!hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
}
public AtlasKafkaMessage<HookNotificationMessage> getKafkaMessage() {
@@ -84,6 +97,10 @@ public class PreprocessorContext {
return kafkaMessage.getPartition();
}
+ public boolean isHivePreprocessEnabled() {
+ return isHivePreProcessEnabled;
+ }
+
public List<AtlasEntity> getEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities()
: null;
}
@@ -102,22 +119,78 @@ public class PreprocessorContext {
public Set<String> getReferredEntitiesToMove() { return
referredEntitiesToMove; }
+ public PreprocessAction getPreprocessActionForHiveDb(String dbName) {
+ PreprocessAction ret = PreprocessAction.NONE;
+
+ if (dbName != null) {
+ for (String dummyDbName : hiveDummyDatabasesToIgnore) {
+ if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) {
+ ret = PreprocessAction.IGNORE;
+
+ break;
+ }
+ }
+ }
+
+ return ret;
+ }
+
public PreprocessAction getPreprocessActionForHiveTable(String
qualifiedName) {
PreprocessAction ret = PreprocessAction.NONE;
- if (qualifiedName != null &&
(CollectionUtils.isNotEmpty(hiveTablesToIgnore) ||
CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
- ret = hiveTablesCache.get(qualifiedName);
+ if (qualifiedName != null) {
+ if (CollectionUtils.isNotEmpty(hiveTablesToIgnore) ||
CollectionUtils.isNotEmpty(hiveTablesToPrune)) {
+ ret = hiveTablesCache.get(qualifiedName);
- if (ret == null) {
- if (isMatch(qualifiedName, hiveTablesToIgnore)) {
- ret = PreprocessAction.IGNORE;
- } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
- ret = PreprocessAction.PRUNE;
- } else {
- ret = PreprocessAction.NONE;
+ if (ret == null) {
+ if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+ ret = PreprocessAction.IGNORE;
+ } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+ ret = PreprocessAction.PRUNE;
+ } else {
+ ret = PreprocessAction.NONE;
+ }
+
+ hiveTablesCache.put(qualifiedName, ret);
}
+ }
+
+ if (ret != PreprocessAction.IGNORE &&
(CollectionUtils.isNotEmpty(hiveDummyTablesToIgnore) ||
CollectionUtils.isNotEmpty(hiveTablePrefixesToIgnore))) {
+ String tblName =
getHiveTableNameFromQualifiedName(qualifiedName);
+
+ if (tblName != null) {
+ for (String dummyTblName : hiveDummyTablesToIgnore) {
+ if (StringUtils.equalsIgnoreCase(tblName,
dummyTblName)) {
+ ret = PreprocessAction.IGNORE;
+
+ break;
+ }
+ }
+
+ if (ret != PreprocessAction.IGNORE) {
+ for (String tableNamePrefix :
hiveTablePrefixesToIgnore) {
+ if (StringUtils.startsWithIgnoreCase(tblName,
tableNamePrefix)) {
+ ret = PreprocessAction.IGNORE;
+
+ break;
+ }
+ }
+ }
+ }
+
+ if (ret != PreprocessAction.IGNORE &&
CollectionUtils.isNotEmpty(hiveDummyDatabasesToIgnore)) {
+ String dbName =
getHiveDbNameFromQualifiedName(qualifiedName);
+
+ if (dbName != null) {
+ for (String dummyDbName : hiveDummyDatabasesToIgnore) {
+ if (StringUtils.equalsIgnoreCase(dbName,
dummyDbName)) {
+ ret = PreprocessAction.IGNORE;
- hiveTablesCache.put(qualifiedName, ret);
+ break;
+ }
+ }
+ }
+ }
}
}
@@ -174,6 +247,81 @@ public class PreprocessorContext {
collectGuids(obj, prunedEntities);
}
+ public void moveRegisteredReferredEntities() {
+ List<AtlasEntity> entities = getEntities();
+ Map<String, AtlasEntity> referredEntities = getReferredEntities();
+
+ if (entities != null && referredEntities != null &&
!referredEntitiesToMove.isEmpty()) {
+ AtlasEntity firstEntity = entities.isEmpty() ? null :
entities.get(0);
+
+ for (String guid : referredEntitiesToMove) {
+ AtlasEntity entity = referredEntities.remove(guid);
+
+ if (entity != null) {
+ entities.add(entity);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("moved referred entity: typeName={},
qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(),
EntityPreprocessor.getQualifiedName(entity), kafkaMessage.getOffset(),
kafkaMessage.getPartition());
+ }
+ }
+ }
+
+ if (firstEntity != null) {
+ LOG.info("moved {} referred-entities to end of entities-list
(firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}",
referredEntitiesToMove.size(), firstEntity.getTypeName(),
EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(),
kafkaMessage.getPartition());
+ } else {
+ LOG.info("moved {} referred-entities to entities-list.
topic-offset={}, partition={}", referredEntitiesToMove.size(),
kafkaMessage.getOffset(), kafkaMessage.getPartition());
+ }
+
+ referredEntitiesToMove.clear();
+ }
+ }
+
+ public String getHiveTableNameFromQualifiedName(String qualifiedName) {
+ String ret = null;
+ int idxStart = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME) + 1;
+
+ if (idxStart != 0 && qualifiedName.length() > idxStart) {
+ int idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME,
idxStart);
+
+ if (idxEnd != -1) {
+ ret = qualifiedName.substring(idxStart, idxEnd);
+ }
+ }
+
+ return ret;
+ }
+
+ public String getHiveDbNameFromQualifiedName(String qualifiedName) {
+ String ret = null;
+ int idxEnd = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME); //
db.table@cluster, db.table.column@cluster
+
+ if (idxEnd == -1) {
+ idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME); //
db@cluster
+ }
+
+ if (idxEnd != -1) {
+ ret = qualifiedName.substring(0, idxEnd);
+ }
+
+ return ret;
+ }
+
+ public String getTypeName(Object obj) {
+ Object ret = null;
+
+ if (obj instanceof AtlasObjectId) {
+ ret = ((AtlasObjectId) obj).getTypeName();
+ } else if (obj instanceof Map) {
+ ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
+ } else if (obj instanceof AtlasEntity) {
+ ret = ((AtlasEntity) obj).getTypeName();
+ } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+ ret = ((AtlasEntity.AtlasEntityWithExtInfo)
obj).getEntity().getTypeName();
+ }
+
+ return ret != null ? ret.toString() : null;
+ }
+
public String getGuid(Object obj) {
Object ret = null;