This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 2471525 ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef attributes 2471525 is described below commit 24715256558bed6f89d4bec25c6eada0c9db2075 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Wed Mar 6 23:49:40 2019 -0800 ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef attributes --- .../patches/008-remove-hive-legacy-attributes.json | 50 ++++++++++++++++ .../003-remove-rdbms-legacy-attributes.json | 54 ++++++++--------- .../bootstrap/AtlasTypeDefStoreInitializer.java | 44 ++++++++++---- .../graph/v2/AtlasRelationshipDefStoreV2.java | 20 ++++--- .../notification/NotificationHookConsumer.java | 47 ++++++++------- .../preprocessor/EntityPreprocessor.java | 2 + .../preprocessor/HivePreprocessor.java | 67 ++++++++++++++++++++++ .../preprocessor/PreprocessorContext.java | 6 +- 8 files changed, 223 insertions(+), 67 deletions(-) diff --git a/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json b/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json new file mode 100644 index 0000000..32a0876 --- /dev/null +++ b/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json @@ -0,0 +1,50 @@ +{ + "patches": [ + { + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", + "typeName": "hive_table_db", + "applyToVersion": "1.1", + "updateToVersion": "1.2", + "params": { + "relationshipLabel": "__hive_table.db" + } + }, + { + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", + "typeName": "hive_table_columns", + "applyToVersion": "1.1", + "updateToVersion": "1.2", + "params": { + "relationshipLabel": "__hive_table.columns" + } + }, + { + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", + "typeName": "hive_table_partitionkeys", + "applyToVersion": "1.1", + "updateToVersion": "1.2", + "params": { + "relationshipLabel": "__hive_table.partitionkeys" + } + }, + { + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", + "typeName": "hive_table_storagedesc", + "applyToVersion": "1.1", + "updateToVersion": "1.2", + "params": { + "relationshipLabel": "__hive_table.sd" + } + }, + { + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", + "typeName": "hive_process_column_lineage", + "applyToVersion": "1.1", + "updateToVersion": "1.2", + "params": { + "relationshipLabel": "__hive_column_lineage.query", + "swapEnds": "true" + } + } + ] +} diff --git a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json index d087c66..5531bee 100644 --- a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json +++ b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json @@ -1,87 +1,87 @@ { "patches": [ { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_instance_databases", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_instance.databases", "relationshipCategory": "COMPOSITION" } }, { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_db_tables", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_db.tables", "relationshipCategory": "COMPOSITION" } }, { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_table_columns", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_table.columns", "relationshipCategory": "COMPOSITION" } }, { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_table_indexes", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_table.indexes", "relationshipCategory": "COMPOSITION" } }, { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_table_foreign_key", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_table.foreign_keys", "relationshipCategory": "COMPOSITION" } }, { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_index_columns", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_index.columns" } }, { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_foreign_key_key_columns", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_foreign_key.key_columns" } }, { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_foreign_key_table_references", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_foreign_key.references_table" } }, { - "action": "REMOVE_LEGACY_ATTRIBUTES", + "action": "REMOVE_LEGACY_REF_ATTRIBUTES", "typeName": "rdbms_foreign_key_column_references", - "applyToVersion": "1.0", - "updateToVersion": "1.1", + "applyToVersion": "1.1", + "updateToVersion": "1.2", "params": { "relationshipLabel": "__rdbms_foreign_key.references_columns" } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index d12284e..66162aa 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -78,9 +78,10 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ @Service public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class); - public static final String PATCHES_FOLDER_NAME = "patches"; - public static final String RELATIONSHIP_LABEL = "relationshipLabel"; - public static final String RELATIONSHIP_CATEGORY = "relationshipCategory"; + public static final String PATCHES_FOLDER_NAME = "patches"; + public static final String RELATIONSHIP_LABEL = "relationshipLabel"; + public static final String RELATIONSHIP_CATEGORY = "relationshipCategory"; + public static final String RELATIONSHIP_SWAP_ENDS = "swapEnds"; private final AtlasTypeDefStore atlasTypeDefStore; private final AtlasTypeRegistry atlasTypeRegistry; @@ -414,7 +415,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { PatchHandler[] patchHandlers = new PatchHandler[] { new AddAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry), new UpdateAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry), - new RemoveLegacyAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry), + new RemoveLegacyRefAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry), new UpdateTypeDefOptionsPatchHandler(atlasTypeDefStore, atlasTypeRegistry), new SetServiceTypePatchHandler(atlasTypeDefStore, atlasTypeRegistry) }; @@ -710,9 +711,9 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { } } - class RemoveLegacyAttributesPatchHandler extends PatchHandler { - public RemoveLegacyAttributesPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) { - super(typeDefStore, typeRegistry, new String[] { "REMOVE_LEGACY_ATTRIBUTES" }); + class RemoveLegacyRefAttributesPatchHandler extends PatchHandler { + public RemoveLegacyRefAttributesPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) { + super(typeDefStore, typeRegistry, new String[] { "REMOVE_LEGACY_REF_ATTRIBUTES" }); } @Override @@ -734,10 +735,12 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { String newRelationshipLabel = null; RelationshipCategory newRelationshipCategory = null; + boolean swapEnds = false; if (patch.getParams() != null) { Object relLabel = patch.getParams().get(RELATIONSHIP_LABEL); Object relCategory = patch.getParams().get(RELATIONSHIP_CATEGORY); + Object relSwapEnds = patch.getParams().get(RELATIONSHIP_SWAP_ENDS); if (relLabel != null) { newRelationshipLabel = relLabel.toString(); @@ -746,6 +749,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { if (relCategory != null) { newRelationshipCategory = RelationshipCategory.valueOf(relCategory.toString()); } + + if (relSwapEnds != null) { + swapEnds = Boolean.valueOf(relSwapEnds.toString()); + } } if (StringUtils.isEmpty(newRelationshipLabel)) { @@ -766,9 +773,19 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { } } - AtlasRelationshipDef updatedDef = new AtlasRelationshipDef(relationshipDef); - AtlasEntityDef updatedEntityDef1 = new AtlasEntityDef(end1Type.getEntityDef()); - AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef()); + AtlasRelationshipDef updatedDef = new AtlasRelationshipDef(relationshipDef); + + if (swapEnds) { + AtlasRelationshipEndDef tmp = updatedDef.getEndDef1(); + + updatedDef.setEndDef1(updatedDef.getEndDef2()); + updatedDef.setEndDef2(tmp); + } + + end1Def = updatedDef.getEndDef1(); + end2Def = updatedDef.getEndDef2(); + end1Type = typeRegistry.getEntityTypeByName(end1Def.getType()); + end2Type = typeRegistry.getEntityTypeByName(end2Def.getType()); updatedDef.setRelationshipLabel(newRelationshipLabel); @@ -776,10 +793,13 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { updatedDef.setRelationshipCategory(newRelationshipCategory); } - updatedDef.getEndDef1().setIsLegacyAttribute(false); - updatedDef.getEndDef2().setIsLegacyAttribute(false); + end1Def.setIsLegacyAttribute(false); + end2Def.setIsLegacyAttribute(false); updatedDef.setTypeVersion(patch.getUpdateToVersion()); + AtlasEntityDef updatedEntityDef1 = new AtlasEntityDef(end1Type.getEntityDef()); + AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef()); + updatedEntityDef1.removeAttribute(end1Def.getName()); updatedEntityDef2.removeAttribute(end2Def.getName()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java index 35d0577..332c18a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java @@ -436,19 +436,25 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe } AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1(); + AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2(); AtlasRelationshipEndDef newEnd1 = newRelationshipDef.getEndDef1(); + AtlasRelationshipEndDef newEnd2 = newRelationshipDef.getEndDef2(); + boolean endsSwaped = false; if ( !isValidUpdate(existingEnd1, newEnd1) ) { - throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE, - newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString()); + if (RequestContext.get().isInTypePatching() && isValidUpdate(existingEnd1, newEnd2)) { // allow swap of ends during type-patch + endsSwaped = true; + } else { + throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE, + newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString()); + } } - AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2(); - AtlasRelationshipEndDef newEnd2 = newRelationshipDef.getEndDef2(); + AtlasRelationshipEndDef newEndToCompareWith = endsSwaped ? newEnd1 : newEnd2; - if ( !isValidUpdate(existingEnd2, newEnd2) ) { + if ( !isValidUpdate(existingEnd2, newEndToCompareWith) ) { throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE, - newRelationshipDef.getName(), newEnd2.toString(), existingEnd2.toString()); + newRelationshipDef.getName(), newEndToCompareWith.toString(), existingEnd2.toString()); } } @@ -520,7 +526,7 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe } private static boolean isValidUpdate(AtlasRelationshipEndDef currentDef, AtlasRelationshipEndDef updatedDef) { - // permit updates to description and isLegacyAttribute (ref type-patch REMOVE_LEGACY_ATTRIBUTES) + // permit updates to description and isLegacyAttribute (ref type-patch REMOVE_LEGACY_REF_ATTRIBUTES) return Objects.equals(currentDef.getType(), updatedDef.getType()) && Objects.equals(currentDef.getName(), updatedDef.getName()) && Objects.equals(currentDef.getIsContainer(), updatedDef.getIsContainer()) && diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 50559dd..d16d544 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -78,7 +78,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -87,6 +86,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; + /** * Consumer of notifications from hooks e.g., hive hook etc. */ @@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size"; + public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs"; public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; @@ -143,6 +144,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List<Pattern> hiveTablesToIgnore = new ArrayList<>(); private final List<Pattern> hiveTablesToPrune = new ArrayList<>(); private final Map<String, PreprocessAction> hiveTablesCache; + private final boolean hiveTypesRemoveOwnedRefAttrs; private final boolean rdbmsTypesRemoveOwnedRefAttrs; private final boolean preprocessEnabled; @@ -172,7 +174,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default - commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50); + commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0); skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 @@ -214,11 +216,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveTablesCache = Collections.emptyMap(); } - rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true); - preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || rdbmsTypesRemoveOwnedRefAttrs; + hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, false); + rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, false); + preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs; LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs); + LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, rdbmsTypesRemoveOwnedRefAttrs); + LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize); + LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled); } @Override @@ -694,7 +701,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl List<AtlasEntity> entitiesList = entities.getEntities(); AtlasEntityStream entityStream = new AtlasEntityStream(entities); - if (entitiesList.size() <= commitBatchSize) { + if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) { atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate); } else { for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) { @@ -792,10 +799,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return; } - PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, rdbmsTypesRemoveOwnedRefAttrs); + PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs); - if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) { - ignoreOrPruneHiveTables(context); + if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs) { + preprocessHiveTypes(context); } if (skipHiveColumnLineageHive20633) { @@ -813,8 +820,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl List<AtlasEntity> entities = context.getEntities(); if (entities != null) { - for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { - AtlasEntity entity = iter.next(); + for (int i = 0; i < entities.size(); i++) { + AtlasEntity entity = entities.get(i); EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName()); if (preprocessor != null) { @@ -824,19 +831,19 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } - private void ignoreOrPruneHiveTables(PreprocessorContext context) { + private void preprocessHiveTypes(PreprocessorContext context) { List<AtlasEntity> entities = context.getEntities(); if (entities != null) { - for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { - AtlasEntity entity = iter.next(); + for (int i = 0; i < entities.size(); i++) { + AtlasEntity entity = entities.get(i); EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName()); if (preprocessor != null) { preprocessor.preprocess(entity, context); if (context.isIgnoredEntity(entity.getGuid())) { - iter.remove(); + entities.remove(i--); } } } @@ -877,8 +884,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl Set<String> lineageQNames = new HashSet<>(); // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases - for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { - AtlasEntity entity = iter.next(); + for (int i = 0; i < entities.size(); i++) { + AtlasEntity entity = entities.get(i); if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { final Object qName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); @@ -887,7 +894,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl final String qualifiedName = qName.toString(); if (lineageQNames.contains(qualifiedName)) { - iter.remove(); + entities.remove(i--); LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition()); @@ -914,11 +921,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0; if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) { - for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { - AtlasEntity entity = iter.next(); + for (int i = 0; i < entities.size(); i++) { + AtlasEntity entity = entities.get(i); if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) { - iter.remove(); + entities.remove(i--); numRemovedEntities++; } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java index 7eba27a..085e746 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -48,6 +48,8 @@ public abstract class EntityPreprocessor { public static final String ATTRIBUTE_SD = "sd"; public static final String ATTRIBUTE_DB = "db"; public static final String ATTRIBUTE_DATABASES = "databases"; + public static final String ATTRIBUTE_QUERY = "query"; + public static final String ATTRIBUTE_TABLE = "table"; public static final String ATTRIBUTE_TABLES = "tables"; public static final String ATTRIBUTE_INDEXES = "indexes"; public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys"; diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java index d54c88d..ff9c9cb 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java @@ -18,14 +18,26 @@ package org.apache.atlas.notification.preprocessor; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; public class HivePreprocessor { + private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class); + + private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns"; + private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionKeys"; + static class HiveTablePreprocessor extends EntityPreprocessor { public HiveTablePreprocessor() { super(TYPE_HIVE_TABLE); @@ -54,9 +66,64 @@ public class HivePreprocessor { entity.setAttribute(ATTRIBUTE_SD, null); entity.setAttribute(ATTRIBUTE_COLUMNS, null); entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null); + } else if (context.getHiveTypesRemoveOwnedRefAttrs()) { + context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD); + + removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context); + removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context); } } } + + private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) { + Object attrVal = entity.getAttribute(attrName); + + if (attrVal != null) { + Set<String> guids = new HashSet<>(); + + context.collectGuids(attrVal, guids); + + for (String guid : guids) { + AtlasEntity colEntity = context.getEntity(guid); + + if (colEntity != null) { + Object attrTable = null; + + if (colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) { + attrTable = colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE); + } else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) { + attrTable = colEntity.getAttribute(ATTRIBUTE_TABLE); + } + + attrTable = setRelationshipType(attrTable, relationshipType); + + if (attrTable != null) { + colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable); + } + + context.addToReferredEntitiesToMove(guid); + } + } + } + } + + private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) { + AtlasRelatedObjectId ret = null; + + if (attr instanceof AtlasRelatedObjectId) { + ret = (AtlasRelatedObjectId) attr; + } else if (attr instanceof AtlasObjectId) { + ret = new AtlasRelatedObjectId((AtlasObjectId) attr); + } else if (attr instanceof Map) { + ret = new AtlasRelatedObjectId((Map) attr); + } + + if (ret != null) { + ret.setRelationshipType(relationshipType); + } + + return ret; + } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java index 0f95fba..94e0993 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -44,16 +44,18 @@ public class PreprocessorContext { private final List<Pattern> hiveTablesToIgnore; private final List<Pattern> hiveTablesToPrune; private final Map<String, PreprocessAction> hiveTablesCache; + private final boolean hiveTypesRemoveOwnedRefAttrs; private final boolean rdbmsTypesRemoveOwnedRefAttrs; private final Set<String> ignoredEntities = new HashSet<>(); private final Set<String> prunedEntities = new HashSet<>(); private final Set<String> referredEntitiesToMove = new HashSet<>(); - public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean rdbmsTypesRemoveOwnedRefAttrs) { + public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) { this.kafkaMessage = kafkaMessage; this.hiveTablesToIgnore = hiveTablesToIgnore; this.hiveTablesToPrune = hiveTablesToPrune; this.hiveTablesCache = hiveTablesCache; + this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs; this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs; final HookNotification message = kafkaMessage.getMessage(); @@ -85,6 +87,8 @@ public class PreprocessorContext { return kafkaMessage.getPartition(); } + public boolean getHiveTypesRemoveOwnedRefAttrs() { return hiveTypesRemoveOwnedRefAttrs; } + public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; } public List<AtlasEntity> getEntities() {