This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 549dee6 ATLAS-3325: update hive-bridge to use relationship attributes
549dee6 is described below
commit 549dee6b6a488da4e5aa75b0927855889e1eb59f
Author: Mandar Ambawane <[email protected]>
AuthorDate: Sat Jul 13 14:56:03 2019 +0530
ATLAS-3325: update hive-bridge to use relationship attributes
Signed-off-by: Madhan Neethiraj <[email protected]>
---
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 26 +++++++++++-----------
.../atlas/hive/hook/events/BaseHiveEvent.java | 16 +++++++------
.../atlas/hive/hook/events/CreateHiveProcess.java | 4 ++--
3 files changed, 24 insertions(+), 22 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 049112b..a61a3e6 100755
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -363,8 +363,8 @@ public class HiveMetaStoreBridge {
processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
processQualifiedName);
processInst.setAttribute(ATTRIBUTE_NAME, query);
processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME,
metadataNamespace);
- processInst.setAttribute(ATTRIBUTE_INPUTS,
Collections.singletonList(BaseHiveEvent.getObjectId(pathInst)));
- processInst.setAttribute(ATTRIBUTE_OUTPUTS,
Collections.singletonList(BaseHiveEvent.getObjectId(tableInst)));
+ processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS,
Collections.singletonList(getAtlasRelatedObjectId(pathInst,
RELATIONSHIP_DATASET_PROCESS_INPUTS)));
+ processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS,
Collections.singletonList(getAtlasRelatedObjectId(tableInst,
RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
processInst.setAttribute(ATTRIBUTE_USER_NAME,
table.getOwner());
processInst.setAttribute(ATTRIBUTE_START_TIME, now);
processInst.setAttribute(ATTRIBUTE_END_TIME, now);
@@ -590,7 +590,7 @@ public class HiveMetaStoreBridge {
long createTime =
BaseHiveEvent.getTableCreateTime(hiveTable);
long lastAccessTime = hiveTable.getLastAccessTime() > 0 ?
hiveTable.getLastAccessTime() : createTime;
- tableEntity.setAttribute(ATTRIBUTE_DB,
BaseHiveEvent.getObjectId(database));
+ tableEntity.setRelationshipAttribute(ATTRIBUTE_DB,
getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB));
tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
tableEntity.setAttribute(ATTRIBUTE_NAME,
hiveTable.getTableName().toLowerCase());
tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
@@ -611,13 +611,13 @@ public class HiveMetaStoreBridge {
tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT,
hiveTable.getViewExpandedText());
}
- AtlasEntity sdEntity = toStroageDescEntity(hiveTable.getSd(),
tableQualifiedName, getStorageDescQFName(tableQualifiedName),
BaseHiveEvent.getObjectId(tableEntity));
- List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(),
tableEntity);
- List<AtlasEntity> columns = toColumns(hiveTable.getCols(),
tableEntity);
+ AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(),
tableQualifiedName, getStorageDescQFName(tableQualifiedName),
BaseHiveEvent.getObjectId(tableEntity));
+ List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(),
tableEntity, RELATIONSHIP_HIVE_TABLE_PART_KEYS);
+ List<AtlasEntity> columns = toColumns(hiveTable.getCols(),
tableEntity, RELATIONSHIP_HIVE_TABLE_COLUMNS);
- tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC,
BaseHiveEvent.getObjectId(sdEntity));
- tableEntity.setAttribute(ATTRIBUTE_PARTITION_KEYS,
BaseHiveEvent.getObjectIds(partKeys));
- tableEntity.setAttribute(ATTRIBUTE_COLUMNS,
BaseHiveEvent.getObjectIds(columns));
+ tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC,
getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+ tableEntity.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS,
getObjectIdsWithRelationshipType(partKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS));
+ tableEntity.setRelationshipAttribute(ATTRIBUTE_COLUMNS,
getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
table.addReferredEntity(database);
table.addReferredEntity(sdEntity);
@@ -639,10 +639,10 @@ public class HiveMetaStoreBridge {
return table;
}
- private AtlasEntity toStroageDescEntity(StorageDescriptor storageDesc,
String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId )
throws AtlasHookException {
+ private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc,
String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId )
throws AtlasHookException {
AtlasEntity ret = new
AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
- ret.setAttribute(ATTRIBUTE_TABLE, tableId);
+ ret.setRelationshipAttribute(ATTRIBUTE_TABLE,
getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
ret.setAttribute(ATTRIBUTE_LOCATION,
HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
@@ -689,7 +689,7 @@ public class HiveMetaStoreBridge {
return ret;
}
- private List<AtlasEntity> toColumns(List<FieldSchema> schemaList,
AtlasEntity table) throws AtlasHookException {
+ private List<AtlasEntity> toColumns(List<FieldSchema> schemaList,
AtlasEntity table, String relationshipType) throws AtlasHookException {
List<AtlasEntity> ret = new ArrayList<>();
int columnPosition = 0;
@@ -698,7 +698,7 @@ public class HiveMetaStoreBridge {
AtlasEntity column = new
AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
- column.setAttribute(ATTRIBUTE_TABLE,
BaseHiveEvent.getObjectId(table));
+ column.setRelationshipAttribute(ATTRIBUTE_TABLE,
getAtlasRelatedObjectId(table, relationshipType));
column.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME),
fs.getName()));
column.setAttribute(ATTRIBUTE_NAME, fs.getName());
column.setAttribute(ATTRIBUTE_OWNER,
table.getAttribute(ATTRIBUTE_OWNER));
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 98b4d4f..a74273a 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -206,19 +206,21 @@ public abstract class BaseHiveEvent {
}
- public static AtlasRelatedObjectId
getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) {
- AtlasRelatedObjectId atlasRelatedObjectId = new
AtlasRelatedObjectId(getObjectId(entity), relationShipType);
- return atlasRelatedObjectId;
+ public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasEntity
entity, String relationshipType) {
+ return getAtlasRelatedObjectId(getObjectId(entity), relationshipType);
}
-
+ public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasObjectId
objectId, String relationShipType) {
+ AtlasRelatedObjectId atlasRelatedObjectId = new
AtlasRelatedObjectId(objectId, relationShipType);
+ return atlasRelatedObjectId;
+ }
public static List<AtlasRelatedObjectId>
getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String
relationshipType) {
final List<AtlasRelatedObjectId> ret;
if (CollectionUtils.isNotEmpty(entities)) {
ret = new ArrayList<>(entities.size());
for (AtlasEntity entity : entities) {
- ret.add(getObjectIdWithRelationshipType(entity,
relationshipType));
+ ret.add(getAtlasRelatedObjectId(entity, relationshipType));
}
} else {
ret = Collections.emptyList();
@@ -478,7 +480,7 @@ public abstract class BaseHiveEvent {
}
- ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC,
getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+ ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC,
getAtlasRelatedObjectId(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS,
getObjectIdsWithRelationshipType(partitionKeys,
RELATIONSHIP_HIVE_TABLE_PART_KEYS));
ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS,
getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
}
@@ -625,7 +627,7 @@ public abstract class BaseHiveEvent {
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
- ret.setRelationshipAttribute(ATTRIBUTE_BUCKET,
getObjectIdWithRelationshipType(bucketEntity,
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
+ ret.setRelationshipAttribute(ATTRIBUTE_BUCKET,
getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 91e063e..6b050d4 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -208,8 +208,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
columnLineageProcess.setAttribute(ATTRIBUTE_NAME,
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" +
outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" +
outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS,
getObjectIdsWithRelationshipType(inputColumns,
BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS));
- columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS,
Collections.singletonList(getObjectIdWithRelationshipType(outputColumn,
BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
- columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY,
getObjectIdWithRelationshipType(hiveProcess,
BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
+ columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS,
Collections.singletonList(getAtlasRelatedObjectId(outputColumn,
BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
+ columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY,
getAtlasRelatedObjectId(hiveProcess,
BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE,
entry.getValue().getType());
columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION,
entry.getValue().getExpr());