This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-1.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-1.0 by this push: new be98213 ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef be98213 is described below commit be98213f5d60d6af7d485782dc077e06e92b0089 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Thu Feb 21 18:16:34 2019 -0800 ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef (cherry picked from commit f103e1438bbfc0c61912ded3551e22578876fbbe) (cherry picked from commit 5d99c40a51d910d3858d4d25e321afad5f12996d) --- addons/models/2000-RDBMS/2010-rdbms_model.json | 6 +- .../003-remove-rdbms-legacy-attributes.json | 90 +++++++++++++ .../apache/atlas/model/instance/AtlasStruct.java | 6 +- .../bootstrap/AtlasTypeDefStoreInitializer.java | 25 +++- .../graph/v2/AtlasRelationshipDefStoreV2.java | 11 +- .../notification/NotificationHookConsumer.java | 30 ++++- .../preprocessor/EntityPreprocessor.java | 39 +++++- .../preprocessor/PreprocessorContext.java | 86 +++++++++---- .../preprocessor/RdbmsPreprocessor.java | 139 +++++++++++++++++++++ 9 files changed, 384 insertions(+), 48 deletions(-) diff --git a/addons/models/2000-RDBMS/2010-rdbms_model.json b/addons/models/2000-RDBMS/2010-rdbms_model.json index 386446c..e72e13a 100644 --- a/addons/models/2000-RDBMS/2010-rdbms_model.json +++ b/addons/models/2000-RDBMS/2010-rdbms_model.json @@ -21,7 +21,7 @@ { "name": "platform", "typeName": "string", - "isOptional": false, + "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true @@ -99,7 +99,7 @@ { "name": "prodOrOther", "typeName": "string", - "isOptional": false, + "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true @@ -259,7 +259,7 @@ { "name": "data_type", "typeName": "string", - "isOptional": false, + "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true diff --git a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json new file mode 100644 index 0000000..d087c66 --- /dev/null +++ b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json @@ -0,0 +1,90 @@ +{ + "patches": [ + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_instance_databases", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_instance.databases", + "relationshipCategory": "COMPOSITION" + } + }, + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_db_tables", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_db.tables", + "relationshipCategory": "COMPOSITION" + } + }, + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_table_columns", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_table.columns", + "relationshipCategory": "COMPOSITION" + } + }, + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_table_indexes", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_table.indexes", + "relationshipCategory": "COMPOSITION" + } + }, + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_table_foreign_key", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_table.foreign_keys", + "relationshipCategory": "COMPOSITION" + } + }, + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_index_columns", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_index.columns" + } + }, + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_foreign_key_key_columns", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_foreign_key.key_columns" + } + }, + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_foreign_key_table_references", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_foreign_key.references_table" + } + }, + { + "action": "REMOVE_LEGACY_ATTRIBUTES", + "typeName": "rdbms_foreign_key_column_references", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": { + "relationshipLabel": "__rdbms_foreign_key.references_columns" + } + } + ] +} diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java index 18e7407..027b160 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java @@ -147,12 +147,10 @@ public class AtlasStruct implements Serializable { } } - public void removeAttribute(String name) { + public Object removeAttribute(String name) { Map<String, Object> a = this.attributes; - if (a != null && a.containsKey(name)) { - a.remove(name); - } + return a != null ? a.remove(name) : null; } public StringBuilder toString(StringBuilder sb) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index a86282e..d12284e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -35,6 +35,7 @@ import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEnumDef; import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef; import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory; import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; @@ -77,8 +78,9 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ @Service public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class); - public static final String PATCHES_FOLDER_NAME = "patches"; - public static final String RELATIONSHIP_LABEL = "relationshipLabel"; + public static final String PATCHES_FOLDER_NAME = "patches"; + public static final String RELATIONSHIP_LABEL = "relationshipLabel"; + public static final String RELATIONSHIP_CATEGORY = "relationshipCategory"; private final AtlasTypeDefStore atlasTypeDefStore; private final AtlasTypeRegistry atlasTypeRegistry; @@ -730,13 +732,19 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { AtlasEntityType end1Type = typeRegistry.getEntityTypeByName(end1Def.getType()); AtlasEntityType end2Type = typeRegistry.getEntityTypeByName(end2Def.getType()); - String newRelationshipLabel = null; + String newRelationshipLabel = null; + RelationshipCategory newRelationshipCategory = null; if (patch.getParams() != null) { - Object val = patch.getParams().get(RELATIONSHIP_LABEL); + Object relLabel = patch.getParams().get(RELATIONSHIP_LABEL); + Object relCategory = patch.getParams().get(RELATIONSHIP_CATEGORY); - if (val != null) { - newRelationshipLabel = val.toString(); + if (relLabel != null) { + newRelationshipLabel = relLabel.toString(); + } + + if (relCategory != null) { + newRelationshipCategory = RelationshipCategory.valueOf(relCategory.toString()); } } @@ -763,6 +771,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef()); updatedDef.setRelationshipLabel(newRelationshipLabel); + + if (newRelationshipCategory != null) { + updatedDef.setRelationshipCategory(newRelationshipCategory); + } + updatedDef.getEndDef1().setIsLegacyAttribute(false); updatedDef.getEndDef2().setIsLegacyAttribute(false); updatedDef.setTypeVersion(patch.getUpdateToVersion()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java index bfee34e..35d0577 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java @@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.typedef.AtlasRelationshipDef; import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory; @@ -425,9 +426,13 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe RelationshipCategory newRelationshipCategory = newRelationshipDef.getRelationshipCategory(); if ( !existingRelationshipCategory.equals(newRelationshipCategory)){ - throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE, - newRelationshipDef.getName(),newRelationshipCategory.name(), - existingRelationshipCategory.name() ); + if (!RequestContext.get().isInTypePatching()) { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE, + newRelationshipDef.getName(), newRelationshipCategory.name(), + existingRelationshipCategory.name()); + } else { + LOG.warn("RELATIONSHIP UPDATE: relationship category from {} to {} for {}", existingRelationshipCategory.name(), newRelationshipCategory.name(), newRelationshipDef.getName()); + } } AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1(); diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 53fd117..eab5d51 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size"; + public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; @@ -142,6 +143,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List<Pattern> hiveTablesToIgnore = new ArrayList<>(); private final List<Pattern> hiveTablesToPrune = new ArrayList<>(); private final Map<String, PreprocessAction> hiveTablesCache; + private final boolean rdbmsTypesRemoveOwnedRefAttrs; private final boolean preprocessEnabled; private NotificationInterface notificationInterface; @@ -212,7 +214,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveTablesCache = Collections.emptyMap(); } - preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633; + rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true); + preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || rdbmsTypesRemoveOwnedRefAttrs; LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); @@ -778,7 +781,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return; } - PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache); + PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, rdbmsTypesRemoveOwnedRefAttrs); if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) { ignoreOrPruneHiveTables(context); @@ -788,16 +791,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl skipHiveColumnLineage(context); } + if (rdbmsTypesRemoveOwnedRefAttrs) { + rdbmsTypeRemoveOwnedRefAttrs(context); + } + context.moveRegisteredReferredEntities(); } + private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) { + List<AtlasEntity> entities = context.getEntities(); + + if (entities != null) { + for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { + AtlasEntity entity = iter.next(); + EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName()); + + if (preprocessor != null) { + preprocessor.preprocess(entity, context); + } + } + } + } + private void ignoreOrPruneHiveTables(PreprocessorContext context) { List<AtlasEntity> entities = context.getEntities(); if (entities != null) { for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next(); - EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName()); + EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName()); if (preprocessor != null) { preprocessor.preprocess(entity, context); @@ -813,7 +835,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (referredEntities != null) { for (Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next().getValue(); - EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName()); + EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName()); if (preprocessor != null) { preprocessor.preprocess(entity, context); diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java index bdea14a..7eba27a 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -32,25 +32,38 @@ public abstract class EntityPreprocessor { public static final String TYPE_HIVE_PROCESS = "hive_process"; public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc"; public static final String TYPE_HIVE_TABLE = "hive_table"; + public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance"; + public static final String TYPE_RDBMS_DB = "rdbms_db"; + public static final String TYPE_RDBMS_TABLE = "rdbms_table"; + public static final String TYPE_RDBMS_COLUMN = "rdbms_column"; + public static final String TYPE_RDBMS_INDEX = "rdbms_index"; + public static final String TYPE_RDBMS_FOREIGN_KEY = "rdbms_foreign_key"; public static final String ATTRIBUTE_COLUMNS = "columns"; public static final String ATTRIBUTE_INPUTS = "inputs"; public static final String ATTRIBUTE_OUTPUTS = "outputs"; public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys"; public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + public static final String ATTRIBUTE_NAME = "name"; public static final String ATTRIBUTE_SD = "sd"; + public static final String ATTRIBUTE_DB = "db"; + public static final String ATTRIBUTE_DATABASES = "databases"; + public static final String ATTRIBUTE_TABLES = "tables"; + public static final String ATTRIBUTE_INDEXES = "indexes"; + public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys"; public static final char QNAME_SEP_CLUSTER_NAME = '@'; public static final char QNAME_SEP_ENTITY_NAME = '.'; public static final String QNAME_SD_SUFFIX = "_storage"; - private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = new HashMap<>(); + private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>(); + private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>(); private final String typeName; static { - EntityPreprocessor[] preprocessors = new EntityPreprocessor[] { + EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] { new HivePreprocessor.HiveTablePreprocessor(), new HivePreprocessor.HiveColumnPreprocessor(), new HivePreprocessor.HiveProcessPreprocessor(), @@ -58,8 +71,18 @@ public abstract class EntityPreprocessor { new HivePreprocessor.HiveStorageDescPreprocessor() }; - for (EntityPreprocessor preprocessor : preprocessors) { - PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); + EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] { + new RdbmsPreprocessor.RdbmsInstancePreprocessor(), + new RdbmsPreprocessor.RdbmsDbPreprocessor(), + new RdbmsPreprocessor.RdbmsTablePreprocessor() + }; + + for (EntityPreprocessor preprocessor : hivePreprocessors) { + HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); + } + + for (EntityPreprocessor preprocessor : rdbmsPreprocessors) { + RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); } } @@ -74,8 +97,12 @@ public abstract class EntityPreprocessor { public abstract void preprocess(AtlasEntity entity, PreprocessorContext context); - public static EntityPreprocessor getPreprocessor(String typeName) { - return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null; + public static EntityPreprocessor getHivePreprocessor(String typeName) { + return typeName != null ? HIVE_PREPROCESSOR_MAP.get(typeName) : null; + } + + public static EntityPreprocessor getRdbmsPreprocessor(String typeName) { + return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null; } public static String getQualifiedName(AtlasEntity entity) { diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java index 2d2c09a..0f95fba 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -44,15 +44,17 @@ public class PreprocessorContext { private final List<Pattern> hiveTablesToIgnore; private final List<Pattern> hiveTablesToPrune; private final Map<String, PreprocessAction> hiveTablesCache; + private final boolean rdbmsTypesRemoveOwnedRefAttrs; private final Set<String> ignoredEntities = new HashSet<>(); private final Set<String> prunedEntities = new HashSet<>(); private final Set<String> referredEntitiesToMove = new HashSet<>(); - public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) { - this.kafkaMessage = kafkaMessage; - this.hiveTablesToIgnore = hiveTablesToIgnore; - this.hiveTablesToPrune = hiveTablesToPrune; - this.hiveTablesCache = hiveTablesCache; + public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean rdbmsTypesRemoveOwnedRefAttrs) { + this.kafkaMessage = kafkaMessage; + this.hiveTablesToIgnore = hiveTablesToIgnore; + this.hiveTablesToPrune = hiveTablesToPrune; + this.hiveTablesCache = hiveTablesCache; + this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs; final HookNotification message = kafkaMessage.getMessage(); @@ -83,6 +85,8 @@ public class PreprocessorContext { return kafkaMessage.getPartition(); } + public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; } + public List<AtlasEntity> getEntities() { return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null; } @@ -95,6 +99,12 @@ public class PreprocessorContext { return entitiesWithExtInfo != null && guid != null ? entitiesWithExtInfo.getEntity(guid) : null; } + public AtlasEntity removeReferredEntity(String guid) { + Map<String, AtlasEntity> referredEntities = getReferredEntities(); + + return referredEntities != null && guid != null ? referredEntities.remove(guid) : null; + } + public Set<String> getIgnoredEntities() { return ignoredEntities; } public Set<String> getPrunedEntities() { return prunedEntities; } @@ -165,6 +175,14 @@ public class PreprocessorContext { } } + public void addToReferredEntitiesToMove(Collection<String> guids) { + if (guids != null) { + for (String guid : guids) { + addToReferredEntitiesToMove(guid); + } + } + } + public void addToIgnoredEntities(Object obj) { collectGuids(obj, ignoredEntities); } @@ -173,6 +191,14 @@ public class PreprocessorContext { collectGuids(obj, prunedEntities); } + public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName) { + Set<String> guidsToMove = new HashSet<>(); + + collectGuids(entity.removeAttribute(attrName), guidsToMove); + + addToReferredEntitiesToMove(guidsToMove); + } + public void moveRegisteredReferredEntities() { List<AtlasEntity> entities = getEntities(); Map<String, AtlasEntity> referredEntities = getReferredEntities(); @@ -202,38 +228,39 @@ public class PreprocessorContext { } } - public String getGuid(Object obj) { + public String getTypeName(Object obj) { Object ret = null; if (obj instanceof AtlasObjectId) { - ret = ((AtlasObjectId) obj).getGuid(); + ret = ((AtlasObjectId) obj).getTypeName(); } else if (obj instanceof Map) { - ret = ((Map) obj).get(AtlasObjectId.KEY_GUID); + ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME); } else if (obj instanceof AtlasEntity) { - ret = ((AtlasEntity) obj).getGuid(); + ret = ((AtlasEntity) obj).getTypeName(); } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) { - ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid(); + ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getTypeName(); } return ret != null ? ret.toString() : null; } + public String getGuid(Object obj) { + Object ret = null; - private boolean isMatch(String name, List<Pattern> patterns) { - boolean ret = false; - - for (Pattern p : patterns) { - if (p.matcher(name).matches()) { - ret = true; - - break; - } + if (obj instanceof AtlasObjectId) { + ret = ((AtlasObjectId) obj).getGuid(); + } else if (obj instanceof Map) { + ret = ((Map) obj).get(AtlasObjectId.KEY_GUID); + } else if (obj instanceof AtlasEntity) { + ret = ((AtlasEntity) obj).getGuid(); + } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) { + ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid(); } - return ret; + return ret != null ? ret.toString() : null; } - private void collectGuids(Object obj, Set<String> guids) { + public void collectGuids(Object obj, Set<String> guids) { if (obj != null) { if (obj instanceof Collection) { Collection objList = (Collection) obj; @@ -247,11 +274,26 @@ public class PreprocessorContext { } } - private void collectGuid(Object obj, Set<String> guids) { + public void collectGuid(Object obj, Set<String> guids) { String guid = getGuid(obj); if (guid != null) { guids.add(guid); } } + + + private boolean isMatch(String name, List<Pattern> patterns) { + boolean ret = false; + + for (Pattern p : patterns) { + if (p.matcher(name).matches()) { + ret = true; + + break; + } + } + + return ret; + } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java new file mode 100644 index 0000000..adc1983 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +public class RdbmsPreprocessor { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class); + + static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor { + public RdbmsInstancePreprocessor() { + super(TYPE_RDBMS_INSTANCE); + } + } + + static class RdbmsDbPreprocessor extends RdbmsTypePreprocessor { + public RdbmsDbPreprocessor() { + super(TYPE_RDBMS_DB); + } + } + + static class RdbmsTablePreprocessor extends RdbmsTypePreprocessor { + public RdbmsTablePreprocessor() { + super(TYPE_RDBMS_TABLE); + } + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + super.preprocess(entity, context); + + // try auto-fix when 'db' attribute is not present in relationshipAttribute & attributes + Object db = entity.getRelationshipAttribute(ATTRIBUTE_DB); + + if (db == null) { + db = entity.getAttribute(ATTRIBUTE_DB); + } + + if (db == null) { + String dbQualifiedName = getDbQualifiedName(entity); + + if (dbQualifiedName != null) { + AtlasObjectId dbId = new AtlasObjectId(TYPE_RDBMS_DB, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName)); + + LOG.info("missing attribute {}.{} is set to {}", TYPE_RDBMS_TABLE, ATTRIBUTE_DB, dbId); + + entity.setRelationshipAttribute(ATTRIBUTE_DB, dbId); + } + } + } + + private String getDbQualifiedName(AtlasEntity tableEntity) { + String ret = null; + Object tblQualifiedName = tableEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); // dbName.tblName@clusterName + Object tblName = tableEntity.getAttribute(ATTRIBUTE_NAME); // tblName + + if (tblQualifiedName != null && tblName != null) { + ret = tblQualifiedName.toString().replace("." + tblName.toString() + "@", "@"); // dbName@clusterName + } + + return ret; + } + + } + + static class RdbmsTypePreprocessor extends EntityPreprocessor { + private static final Set<String> entityTypesToMove = new HashSet<>(); + + static { + entityTypesToMove.add(TYPE_RDBMS_DB); + entityTypesToMove.add(TYPE_RDBMS_TABLE); + entityTypesToMove.add(TYPE_RDBMS_COLUMN); + entityTypesToMove.add(TYPE_RDBMS_INDEX); + entityTypesToMove.add(TYPE_RDBMS_FOREIGN_KEY); + } + + protected RdbmsTypePreprocessor(String typeName) { + super(typeName); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (context.getRdbmsTypesRemoveOwnedRefAttrs()) { + clearRefAttributesAndMove(entity, context); + + Map<String, AtlasEntity> referredEntities = context.getReferredEntities(); + + if (MapUtils.isNotEmpty(referredEntities)) { + for (AtlasEntity referredEntity : referredEntities.values()) { + if (entityTypesToMove.contains(referredEntity.getTypeName())) { + clearRefAttributesAndMove(referredEntity, context); + } + } + } + } + } + + private void clearRefAttributesAndMove(AtlasEntity entity, PreprocessorContext context) { + switch (entity.getTypeName()) { + case TYPE_RDBMS_INSTANCE: + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES); + break; + + case TYPE_RDBMS_DB: + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES); + break; + + case TYPE_RDBMS_TABLE: + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES); + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS); + break; + } + } + } +}