This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 1b6a0c4f6347b36cbd6a7bbbf8586927b157481d Author: Mandar Ambawane <[email protected]> AuthorDate: Mon Jul 15 16:36:42 2019 +0530 ATLAS-3326 Update Hbase hook/bridge to use relationship attribute and move Conversion methods to AtlasTypeUtil Signed-off-by: nixonrodrigues <[email protected]> (cherry picked from commit ca62d8659db632954ed28e35f3a5e7d27d3fe8b4) --- .../apache/atlas/hbase/bridge/HBaseAtlasHook.java | 9 ++-- .../org/apache/atlas/hbase/bridge/HBaseBridge.java | 7 ++- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 19 ++++---- .../atlas/hive/hook/events/BaseHiveEvent.java | 52 +++++----------------- .../atlas/hive/hook/events/CreateHiveProcess.java | 7 +-- .../atlas/hive/bridge/HiveMetaStoreBridgeTest.java | 3 +- .../java/org/apache/atlas/type/AtlasTypeUtil.java | 44 ++++++++++++++++++ 7 files changed, 80 insertions(+), 61 deletions(-) diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java index 6d062e2..26be9b3 100644 --- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java @@ -103,6 +103,9 @@ public class HBaseAtlasHook extends AtlasHook { private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName"; + public static final String RELATIONSHIP_HBASE_TABLE_COLUMN_FAMILIES = "hbase_table_column_families"; + public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = "hbase_table_namespace"; + private static volatile HBaseAtlasHook me; public enum OPERATION { @@ -212,7 +215,7 @@ public class HBaseAtlasHook extends AtlasHook { AtlasEntity table = buildTable(hbaseOperationContext, nameSpace); List<AtlasEntity> columnFamilies = buildColumnFamilies(hbaseOperationContext, nameSpace, table); - table.setAttribute(ATTR_COLUMNFAMILIES, AtlasTypeUtil.getAtlasObjectIds(columnFamilies)); + table.setRelationshipAttribute(ATTR_COLUMNFAMILIES, AtlasTypeUtil.getAtlasRelatedObjectIds(columnFamilies, RELATIONSHIP_HBASE_TABLE_COLUMN_FAMILIES)); AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(table); @@ -392,7 +395,7 @@ public class HBaseAtlasHook extends AtlasHook { table.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner()); table.setAttribute(ATTR_DESCRIPTION, tableName); table.setAttribute(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf()); - table.setAttribute(ATTR_NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpace)); + table.setRelationshipAttribute(ATTR_NAMESPACE, AtlasTypeUtil.getAtlasRelatedObjectId(nameSpace, RELATIONSHIP_HBASE_TABLE_NAMESPACE)); TableDescriptor tableDescriptor = hbaseOperationContext.gethTableDescriptor(); if (tableDescriptor != null) { @@ -451,7 +454,7 @@ public class HBaseAtlasHook extends AtlasHook { columnFamily.setAttribute(ATTR_DESCRIPTION, columnFamilyName); columnFamily.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQName); columnFamily.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner()); - columnFamily.setAttribute(ATTR_TABLE, AtlasTypeUtil.getAtlasObjectId(table)); + columnFamily.setRelationshipAttribute(ATTR_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(table, RELATIONSHIP_HBASE_TABLE_COLUMN_FAMILIES)); if (columnFamilyDescriptor!= null) { columnFamily.setAttribute(ATTR_CF_BLOCK_CACHE_ENABLED, columnFamilyDescriptor.isBlockCacheEnabled()); diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java index 4a4b4d9..fde70f1 100644 --- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java @@ -430,8 +430,7 @@ public class HBaseBridge { cfIDs.add(AtlasTypeUtil.getAtlasObjectId(cfEntity.getEntity())); } } - - tableEntity.setAttribute(COLUMN_FAMILIES, cfIDs); + tableEntity.setRelationshipAttribute(COLUMN_FAMILIES, AtlasTypeUtil.getAtlasRelatedObjectIdList(cfIDs, HBaseAtlasHook.RELATIONSHIP_HBASE_TABLE_COLUMN_FAMILIES)); } return ret; @@ -548,7 +547,7 @@ public class HBaseBridge { ret.setAttribute(QUALIFIED_NAME, tableQualifiedName); ret.setAttribute(CLUSTERNAME, metadataNamespace); - ret.setAttribute(NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpaceEntity)); + ret.setRelationshipAttribute(NAMESPACE, AtlasTypeUtil.getAtlasRelatedObjectId(nameSpaceEntity, HBaseAtlasHook.RELATIONSHIP_HBASE_TABLE_NAMESPACE)); ret.setAttribute(NAME, tableName); ret.setAttribute(DESCRIPTION_ATTR, tableName); ret.setAttribute(OWNER, owner); @@ -577,7 +576,7 @@ public class HBaseBridge { ret.setAttribute(QUALIFIED_NAME, cfQualifiedName); ret.setAttribute(CLUSTERNAME, metadataNamespace); - ret.setAttribute(TABLE, tableId); + ret.setRelationshipAttribute(TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(tableId, HBaseAtlasHook.RELATIONSHIP_HBASE_TABLE_COLUMN_FAMILIES)); ret.setAttribute(NAME, cfName); ret.setAttribute(DESCRIPTION_ATTR, cfName); ret.setAttribute(OWNER, owner); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index a61a3e6..2f1bb0e 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -20,6 +20,7 @@ package org.apache.atlas.hive.bridge; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasServiceException; @@ -363,8 +364,8 @@ public class HiveMetaStoreBridge { processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName); processInst.setAttribute(ATTRIBUTE_NAME, query); processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); - processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS))); - processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); + processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS))); + processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner()); processInst.setAttribute(ATTRIBUTE_START_TIME, now); processInst.setAttribute(ATTRIBUTE_END_TIME, now); @@ -590,7 +591,7 @@ public class HiveMetaStoreBridge { long createTime = BaseHiveEvent.getTableCreateTime(hiveTable); long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime; - tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB)); tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName); tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase()); tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner()); @@ -611,13 +612,13 @@ public class HiveMetaStoreBridge { tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText()); } - AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), BaseHiveEvent.getObjectId(tableEntity)); + AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), AtlasTypeUtil.getObjectId(tableEntity)); List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity, RELATIONSHIP_HIVE_TABLE_PART_KEYS); List<AtlasEntity> columns = toColumns(hiveTable.getCols(), tableEntity, RELATIONSHIP_HIVE_TABLE_COLUMNS); - tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); - tableEntity.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS)); - tableEntity.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, AtlasTypeUtil.getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, AtlasTypeUtil.getAtlasRelatedObjectIds(partKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_COLUMNS, AtlasTypeUtil.getAtlasRelatedObjectIds(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS)); table.addReferredEntity(database); table.addReferredEntity(sdEntity); @@ -642,7 +643,7 @@ public class HiveMetaStoreBridge { private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException { AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName()); - ret.setRelationshipAttribute(ATTRIBUTE_TABLE, getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); + ret.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName); ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters()); ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation())); @@ -698,7 +699,7 @@ public class HiveMetaStoreBridge { AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName()); - column.setRelationshipAttribute(ATTRIBUTE_TABLE, getAtlasRelatedObjectId(table, relationshipType)); + column.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(table, relationshipType)); column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName())); column.setAttribute(ATTRIBUTE_NAME, fs.getName()); column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER)); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index 1a6e070..fe0a140 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -209,36 +209,6 @@ public abstract class BaseHiveEvent { } - public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasEntity entity, String relationshipType) { - return getAtlasRelatedObjectId(getObjectId(entity), relationshipType); - } - - public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasObjectId objectId, String relationShipType) { - AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(objectId, relationShipType); - return atlasRelatedObjectId; - } - - public static List<AtlasRelatedObjectId> getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String relationshipType) { - final List<AtlasRelatedObjectId> ret; - if (CollectionUtils.isNotEmpty(entities)) { - ret = new ArrayList<>(entities.size()); - for (AtlasEntity entity : entities) { - ret.add(getAtlasRelatedObjectId(entity, relationshipType)); - } - } else { - ret = Collections.emptyList(); - } - return ret; - } - - - public static AtlasObjectId getObjectId(AtlasEntity entity) { - String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); - AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); - - return ret; - } - public static List<AtlasObjectId> getObjectIds(List<AtlasEntity> entities) { final List<AtlasObjectId> ret; @@ -246,7 +216,7 @@ public abstract class BaseHiveEvent { ret = new ArrayList<>(entities.size()); for (AtlasEntity entity : entities) { - ret.add(getObjectId(entity)); + ret.add(AtlasTypeUtil.getObjectId(entity)); } } else { ret = Collections.emptyList(); @@ -403,7 +373,7 @@ public abstract class BaseHiveEvent { } } - AtlasEntity ret = toTableEntity(getObjectId(dbEntity), table, entityExtInfo); + AtlasEntity ret = toTableEntity(AtlasTypeUtil.getObjectId(dbEntity), table, entityExtInfo); return ret; } @@ -459,7 +429,7 @@ public abstract class BaseHiveEvent { if (pruneTable) { LOG.info("ignoring details of table {}", tblQualifiedName); } else { - AtlasObjectId tableId = getObjectId(ret); + AtlasObjectId tableId = AtlasTypeUtil.getObjectId(ret); AtlasEntity sd = getStorageDescEntity(tableId, table); List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys(), RELATIONSHIP_HIVE_TABLE_PART_KEYS); List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols(), RELATIONSHIP_HIVE_TABLE_COLUMNS); @@ -483,9 +453,9 @@ public abstract class BaseHiveEvent { } - ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getAtlasRelatedObjectId(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); - ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS)); - ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS)); + ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, AtlasTypeUtil.getAtlasRelatedObjectId(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); + ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, AtlasTypeUtil.getAtlasRelatedObjectIds(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS)); + ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, AtlasTypeUtil.getAtlasRelatedObjectIds(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS)); } context.putEntity(tblQualifiedName, ret); @@ -630,7 +600,7 @@ public abstract class BaseHiveEvent { ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); - ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); + ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); @@ -678,8 +648,8 @@ public abstract class BaseHiveEvent { } ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs)); - ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS)); - ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, getObjectIdsWithRelationshipType(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)); + ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS)); + ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)); ret.setAttribute(ATTRIBUTE_NAME, queryStr); ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName()); @@ -726,7 +696,7 @@ public abstract class BaseHiveEvent { } protected AtlasEntity createHiveDDLEntity(AtlasEntity dbOrTable, boolean excludeEntityGuid) { - AtlasObjectId objId = BaseHiveEvent.getObjectId(dbOrTable); + AtlasObjectId objId = AtlasTypeUtil.getObjectId(dbOrTable); AtlasEntity hiveDDL = null; if (excludeEntityGuid) { @@ -1004,7 +974,7 @@ public abstract class BaseHiveEvent { ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName); ret.setAttribute(ATTRIBUTE_URI, hbaseTableName); - AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE); + AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(AtlasTypeUtil.getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE); ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(metadataNamespace, hbaseNameSpace, hbaseTableName)); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index 6b050d4..82df576 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -18,6 +18,7 @@ package org.apache.atlas.hive.hook.events; +import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.hive.hook.AtlasHiveHookContext; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; @@ -207,9 +208,9 @@ public class CreateHiveProcess extends BaseHiveEvent { columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); - columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS)); - columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getAtlasRelatedObjectId(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); - columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getAtlasRelatedObjectId(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE)); + columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS)); + columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); + columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, AtlasTypeUtil.getAtlasRelatedObjectId(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE)); columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType()); columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr()); diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java index 4403aaf..bdf8578 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java @@ -24,6 +24,7 @@ import org.apache.atlas.AtlasServiceException; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.type.AtlasTypeUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; @@ -293,7 +294,7 @@ public class HiveMetaStoreBridgeTest { private AtlasEntity createTableReference() { AtlasEntity tableEntity = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()); AtlasEntity sdEntity = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName()); - tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sdEntity)); + tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, AtlasTypeUtil.getObjectId(sdEntity)); return tableEntity; } diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java index 6ac176d..0883d54 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java @@ -67,6 +67,8 @@ public class AtlasTypeUtil { private static final String InvalidTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter, number, '_' ] characters."; private static final String InvalidTraitTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter, number, '_', '.' ] characters."; + public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + static { Collections.addAll(ATLAS_BUILTIN_TYPENAMES, AtlasBaseTypeDef.ATLAS_BUILTIN_TYPES); } @@ -468,6 +470,48 @@ public class AtlasTypeUtil { return ret; } + public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasEntity entity, String relationshipType) { + return getAtlasRelatedObjectId(getObjectId(entity), relationshipType); + } + + public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasObjectId objectId, String relationShipType) { + AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(objectId, relationShipType); + return atlasRelatedObjectId; + } + + public static List<AtlasRelatedObjectId> getAtlasRelatedObjectIds(List<AtlasEntity> entities, String relationshipType) { + final List<AtlasRelatedObjectId> ret; + if (CollectionUtils.isNotEmpty(entities)) { + ret = new ArrayList<>(entities.size()); + for (AtlasEntity entity : entities) { + ret.add(getAtlasRelatedObjectId(entity, relationshipType)); + } + } else { + ret = Collections.emptyList(); + } + return ret; + } + + public static List<AtlasRelatedObjectId> getAtlasRelatedObjectIdList(List<AtlasObjectId> atlasObjectIds, String relationshipType) { + final List<AtlasRelatedObjectId> ret; + if (CollectionUtils.isNotEmpty(atlasObjectIds)) { + ret = new ArrayList<>(atlasObjectIds.size()); + for (AtlasObjectId atlasObjectId : atlasObjectIds) { + ret.add(getAtlasRelatedObjectId(atlasObjectId, relationshipType)); + } + } else { + ret = Collections.emptyList(); + } + return ret; + } + + public static AtlasObjectId getObjectId(AtlasEntity entity) { + String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); + + return ret; + } + public static boolean isValidGuid(AtlasObjectId objId) { return isValidGuid(objId.getGuid()); }
