This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 47c9e42db1ca8785c764302e497c24cb8cac6cc7 Author: Mandar Ambawane <[email protected]> AuthorDate: Mon Jul 8 20:26:20 2019 +0530 ATLAS-3211 :- Update Hive hook with Relationship Attributes. Signed-off-by: nixonrodrigues <[email protected]> (cherry picked from commit 5c0025177e6520564921c46d9a442e7da6f162c5) --- .../atlas/hive/hook/events/AlterTableRename.java | 32 ++------ .../atlas/hive/hook/events/BaseHiveEvent.java | 89 +++++++++++++++++----- .../atlas/hive/hook/events/CreateHiveProcess.java | 6 +- .../atlas/model/instance/AtlasRelatedObjectId.java | 6 ++ .../java/org/apache/atlas/type/AtlasTypeUtil.java | 4 + 5 files changed, 89 insertions(+), 48 deletions(-) diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java index 927ddc0..c59ff7f 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java @@ -129,17 +129,13 @@ public class AlterTableRename extends BaseHiveEvent { // update qualifiedName for all columns, partitionKeys, storageDesc String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME); - renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_COLUMNS), oldTableEntity, renamedTableQualifiedName, ret); - renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_PARTITION_KEYS), oldTableEntity, renamedTableQualifiedName, ret); + renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_COLUMNS), oldTableEntity, renamedTableQualifiedName, ret); + renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS), oldTableEntity, renamedTableQualifiedName, ret); renameStorageDesc(oldTableEntity, renamedTableEntity, ret); - // remove columns, partitionKeys and storageDesc - as they have already been updated above - removeAttribute(renamedTableEntity, ATTRIBUTE_COLUMNS); - removeAttribute(renamedTableEntity, ATTRIBUTE_PARTITION_KEYS); - removeAttribute(renamedTableEntity, ATTRIBUTE_STORAGEDESC); - // set previous name as the alias renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES, Collections.singletonList(oldTable.getTableName())); + renamedTableEntity.getEntity().setRelationshipAttributes(null); String oldTableQualifiedName = (String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME); AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName); @@ -179,35 +175,17 @@ public class AlterTableRename extends BaseHiveEvent { AtlasObjectId oldSdId = new AtlasObjectId(oldSd.getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldSd.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); newSd.removeAttribute(ATTRIBUTE_TABLE); + newSd.setRelationshipAttributes(null); notifications.add(new EntityPartialUpdateRequestV2(getUserName(), oldSdId, new AtlasEntityWithExtInfo(newSd))); } } - private void removeAttribute(AtlasEntityWithExtInfo entity, String attributeName) { - Object attributeValue = entity.getEntity().getAttribute(attributeName); - - entity.getEntity().getAttributes().remove(attributeName); - - if (attributeValue instanceof AtlasObjectId) { - AtlasObjectId objectId = (AtlasObjectId) attributeValue; - - entity.removeReferredEntity(objectId.getGuid()); - } else if (attributeValue instanceof Collection) { - for (Object item : (Collection) attributeValue) - if (item instanceof AtlasObjectId) { - AtlasObjectId objectId = (AtlasObjectId) item; - - entity.removeReferredEntity(objectId.getGuid()); - } - } - } - private AtlasEntity getStorageDescEntity(AtlasEntityWithExtInfo tableEntity) { AtlasEntity ret = null; if (tableEntity != null && tableEntity.getEntity() != null) { - Object attrSdId = tableEntity.getEntity().getAttribute(ATTRIBUTE_STORAGEDESC); + Object attrSdId = tableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_STORAGEDESC); if (attrSdId instanceof AtlasObjectId) { ret = tableEntity.getReferredEntity(((AtlasObjectId) attrSdId).getGuid()); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index c15bbfa..0bf3ce2 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.repository.Constants; @@ -161,6 +162,20 @@ public abstract class BaseHiveEvent { public static final String HDFS_PATH_PREFIX = "hdfs://"; public static final String EMPTY_ATTRIBUTE_VALUE = ""; + public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs"; + public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs"; + public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE = "hive_process_column_lineage"; + public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db"; + public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS = "hive_table_partitionkeys"; + public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns"; + public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc"; + public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs"; + public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions"; + public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries"; + public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries"; + public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = "hbase_table_namespace"; + + public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>(); @@ -193,6 +208,28 @@ public abstract class BaseHiveEvent { return table.getTTable() != null ? (table.getOwner()): ""; } + + public static AtlasRelatedObjectId getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) { + AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(getObjectId(entity), relationShipType); + return atlasRelatedObjectId; + } + + + + public static List<AtlasRelatedObjectId> getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String relationshipType) { + final List<AtlasRelatedObjectId> ret; + if (CollectionUtils.isNotEmpty(entities)) { + ret = new ArrayList<>(entities.size()); + for (AtlasEntity entity : entities) { + ret.add(getObjectIdWithRelationshipType(entity, relationshipType)); + } + } else { + ret = Collections.emptyList(); + } + return ret; + } + + public static AtlasObjectId getObjectId(AtlasEntity entity) { String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); @@ -393,7 +430,9 @@ public abstract class BaseHiveEvent { long createTime = getTableCreateTime(table); long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime; - ret.setAttribute(ATTRIBUTE_DB, dbId); + AtlasRelatedObjectId dbRelatedObject = new AtlasRelatedObjectId(dbId, RELATIONSHIP_HIVE_TABLE_DB); + + ret.setRelationshipAttribute(ATTRIBUTE_DB, dbRelatedObject ); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName); ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase()); ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner()); @@ -420,8 +459,10 @@ public abstract class BaseHiveEvent { } else { AtlasObjectId tableId = getObjectId(ret); AtlasEntity sd = getStorageDescEntity(tableId, table); - List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys()); - List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols()); + List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys(), RELATIONSHIP_HIVE_TABLE_PART_KEYS); + List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols(), RELATIONSHIP_HIVE_TABLE_COLUMNS); + + if (entityExtInfo != null) { entityExtInfo.addReferredEntity(sd); @@ -439,9 +480,10 @@ public abstract class BaseHiveEvent { } } - ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd)); - ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys)); - ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns)); + + ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); + ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS)); + ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS)); } context.putEntity(tblQualifiedName, ret); @@ -469,7 +511,9 @@ public abstract class BaseHiveEvent { StorageDescriptor sd = table.getSd(); - ret.setAttribute(ATTRIBUTE_TABLE, tableId); + AtlasRelatedObjectId tableRelatedObject = new AtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC); + + ret.setRelationshipAttribute(ATTRIBUTE_TABLE, tableRelatedObject); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName); ret.setAttribute(ATTRIBUTE_PARAMETERS, sd.getParameters()); ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(sd.getLocation())); @@ -515,7 +559,7 @@ public abstract class BaseHiveEvent { return ret; } - protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas) { + protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas, String relationshipType) { List<AtlasEntity> ret = new ArrayList<>(); boolean isKnownTable = tableId.getGuid() == null; int columnPosition = 0; @@ -534,8 +578,8 @@ public abstract class BaseHiveEvent { if (isKnownTable) { column.setGuid(null); } - - column.setAttribute(ATTRIBUTE_TABLE, tableId); + AtlasRelatedObjectId relatedObjectId = new AtlasRelatedObjectId(tableId, relationshipType); + column.setRelationshipAttribute(ATTRIBUTE_TABLE, (relatedObjectId)); column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName); column.setAttribute(ATTRIBUTE_NAME, fieldSchema.getName()); column.setAttribute(ATTRIBUTE_OWNER, table.getOwner()); @@ -583,7 +627,7 @@ public abstract class BaseHiveEvent { ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); - ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity)); + ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getObjectIdWithRelationshipType(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); @@ -631,8 +675,8 @@ public abstract class BaseHiveEvent { } ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs)); - ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs)); - ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs)); + ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS)); + ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, getObjectIdsWithRelationshipType(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)); ret.setAttribute(ATTRIBUTE_NAME, queryStr); ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName()); @@ -668,8 +712,9 @@ public abstract class BaseHiveEvent { ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr); ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId()); ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported"); - ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName()); - ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess)); + ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName()); // + AtlasRelatedObjectId hiveProcessRelationObjectId = AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess, RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE); + ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, hiveProcessRelationObjectId); return ret; } @@ -684,11 +729,16 @@ public abstract class BaseHiveEvent { if (excludeEntityGuid) { objId.setGuid(null); } + AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(objId); if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_DB)) { - hiveDDL = new AtlasEntity(HIVE_DB_DDL, ATTRIBUTE_DB, objId); + hiveDDL = new AtlasEntity(HIVE_DB_DDL); + objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_DB_DDL_QUERIES); + hiveDDL.setRelationshipAttribute(ATTRIBUTE_DB, objIdRelatedObject); } else if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_TABLE)) { - hiveDDL = new AtlasEntity(HIVE_TABLE_DDL, ATTRIBUTE_TABLE, objId); + hiveDDL = new AtlasEntity(HIVE_TABLE_DDL); + objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_TABLE_DDL_QUERIES); + hiveDDL.setRelationshipAttribute( ATTRIBUTE_TABLE, objIdRelatedObject); } if (hiveDDL != null) { @@ -948,7 +998,10 @@ public abstract class BaseHiveEvent { ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName); ret.setAttribute(ATTRIBUTE_URI, hbaseTableName); - ret.setAttribute(ATTRIBUTE_NAMESPACE, getObjectId(nsEntity)); + + AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE); + + ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName)); entities.addReferredEntity(nsEntity); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index 7791fb4..91e063e 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -207,9 +207,9 @@ public class CreateHiveProcess extends BaseHiveEvent { columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); - columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns)); - columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn))); - columnLineageProcess.setAttribute(ATTRIBUTE_QUERY, getObjectId(hiveProcess)); + columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS)); + columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectIdWithRelationshipType(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); + columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getObjectIdWithRelationshipType(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE)); columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType()); columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr()); diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java index 7c57ccf..ae6932d 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java @@ -83,6 +83,12 @@ public class AtlasRelatedObjectId extends AtlasObjectId implements Serializable super(other); } + public AtlasRelatedObjectId(AtlasObjectId objId, String relationshipType) { + this(objId); + + setRelationshipType(relationshipType); + } + public AtlasRelatedObjectId(Map objIdMap) { super(objIdMap); diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java index d74c7e3..6ac176d 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java @@ -414,6 +414,10 @@ public class AtlasTypeUtil { return new AtlasRelatedObjectId(getAtlasObjectId(entity)); } + public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, String relationshipType){ + return new AtlasRelatedObjectId(getAtlasObjectId(entity), relationshipType); + } + public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) { return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry)); }
