This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 5c00251 ATLAS-3211 :- Update Hive hook with Relationship Attributes.
5c00251 is described below
commit 5c0025177e6520564921c46d9a442e7da6f162c5
Author: Mandar Ambawane <[email protected]>
AuthorDate: Mon Jul 8 20:26:20 2019 +0530
ATLAS-3211 :- Update Hive hook with Relationship Attributes.
Signed-off-by: nixonrodrigues <[email protected]>
---
.../atlas/hive/hook/events/AlterTableRename.java | 32 ++------
.../atlas/hive/hook/events/BaseHiveEvent.java | 89 +++++++++++++++++-----
.../atlas/hive/hook/events/CreateHiveProcess.java | 6 +-
.../atlas/model/instance/AtlasRelatedObjectId.java | 6 ++
.../java/org/apache/atlas/type/AtlasTypeUtil.java | 4 +
5 files changed, 89 insertions(+), 48 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
index 927ddc0..c59ff7f 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
@@ -129,17 +129,13 @@ public class AlterTableRename extends BaseHiveEvent {
// update qualifiedName for all columns, partitionKeys, storageDesc
String renamedTableQualifiedName = (String)
renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
- renameColumns((List<AtlasObjectId>)
oldTableEntity.getEntity().getAttribute(ATTRIBUTE_COLUMNS), oldTableEntity,
renamedTableQualifiedName, ret);
- renameColumns((List<AtlasObjectId>)
oldTableEntity.getEntity().getAttribute(ATTRIBUTE_PARTITION_KEYS),
oldTableEntity, renamedTableQualifiedName, ret);
+ renameColumns((List<AtlasObjectId>)
oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_COLUMNS),
oldTableEntity, renamedTableQualifiedName, ret);
+ renameColumns((List<AtlasObjectId>)
oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS),
oldTableEntity, renamedTableQualifiedName, ret);
renameStorageDesc(oldTableEntity, renamedTableEntity, ret);
- // remove columns, partitionKeys and storageDesc - as they have
already been updated above
- removeAttribute(renamedTableEntity, ATTRIBUTE_COLUMNS);
- removeAttribute(renamedTableEntity, ATTRIBUTE_PARTITION_KEYS);
- removeAttribute(renamedTableEntity, ATTRIBUTE_STORAGEDESC);
-
// set previous name as the alias
renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES,
Collections.singletonList(oldTable.getTableName()));
+ renamedTableEntity.getEntity().setRelationshipAttributes(null);
String oldTableQualifiedName = (String)
oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
AtlasObjectId oldTableId = new
AtlasObjectId(oldTableEntity.getEntity().getTypeName(),
ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName);
@@ -179,35 +175,17 @@ public class AlterTableRename extends BaseHiveEvent {
AtlasObjectId oldSdId = new AtlasObjectId(oldSd.getTypeName(),
ATTRIBUTE_QUALIFIED_NAME, oldSd.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
newSd.removeAttribute(ATTRIBUTE_TABLE);
+ newSd.setRelationshipAttributes(null);
notifications.add(new EntityPartialUpdateRequestV2(getUserName(),
oldSdId, new AtlasEntityWithExtInfo(newSd)));
}
}
- private void removeAttribute(AtlasEntityWithExtInfo entity, String
attributeName) {
- Object attributeValue = entity.getEntity().getAttribute(attributeName);
-
- entity.getEntity().getAttributes().remove(attributeName);
-
- if (attributeValue instanceof AtlasObjectId) {
- AtlasObjectId objectId = (AtlasObjectId) attributeValue;
-
- entity.removeReferredEntity(objectId.getGuid());
- } else if (attributeValue instanceof Collection) {
- for (Object item : (Collection) attributeValue)
- if (item instanceof AtlasObjectId) {
- AtlasObjectId objectId = (AtlasObjectId) item;
-
- entity.removeReferredEntity(objectId.getGuid());
- }
- }
- }
-
private AtlasEntity getStorageDescEntity(AtlasEntityWithExtInfo
tableEntity) {
AtlasEntity ret = null;
if (tableEntity != null && tableEntity.getEntity() != null) {
- Object attrSdId =
tableEntity.getEntity().getAttribute(ATTRIBUTE_STORAGEDESC);
+ Object attrSdId =
tableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_STORAGEDESC);
if (attrSdId instanceof AtlasObjectId) {
ret = tableEntity.getReferredEntity(((AtlasObjectId)
attrSdId).getGuid());
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 05d79df..b3663da 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -25,6 +25,7 @@ import
org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
@@ -158,6 +159,20 @@ public abstract class BaseHiveEvent {
public static final String HDFS_PATH_PREFIX = "hdfs://";
public static final String EMPTY_ATTRIBUTE_VALUE = "";
+ public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS =
"dataset_process_inputs";
+ public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS =
"process_dataset_outputs";
+ public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE =
"hive_process_column_lineage";
+ public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
+ public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS =
"hive_table_partitionkeys";
+ public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS =
"hive_table_columns";
+ public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC =
"hive_table_storagedesc";
+ public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS =
"aws_s3_bucket_aws_s3_pseudo_dirs";
+ public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE =
"hive_process_process_executions";
+ public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES =
"hive_db_ddl_queries";
+ public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES =
"hive_table_ddl_queries";
+ public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE =
"hbase_table_namespace";
+
+
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new
HashMap<>();
@@ -190,6 +205,28 @@ public abstract class BaseHiveEvent {
return table.getTTable() != null ? (table.getOwner()): "";
}
+
+ public static AtlasRelatedObjectId
getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) {
+ AtlasRelatedObjectId atlasRelatedObjectId = new
AtlasRelatedObjectId(getObjectId(entity), relationShipType);
+ return atlasRelatedObjectId;
+ }
+
+
+
+ public static List<AtlasRelatedObjectId>
getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String
relationshipType) {
+ final List<AtlasRelatedObjectId> ret;
+ if (CollectionUtils.isNotEmpty(entities)) {
+ ret = new ArrayList<>(entities.size());
+ for (AtlasEntity entity : entities) {
+ ret.add(getObjectIdWithRelationshipType(entity,
relationshipType));
+ }
+ } else {
+ ret = Collections.emptyList();
+ }
+ return ret;
+ }
+
+
public static AtlasObjectId getObjectId(AtlasEntity entity) {
String qualifiedName = (String)
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
AtlasObjectId ret = new AtlasObjectId(entity.getGuid(),
entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME,
qualifiedName));
@@ -390,7 +427,9 @@ public abstract class BaseHiveEvent {
long createTime = getTableCreateTime(table);
long lastAccessTime = table.getLastAccessTime() > 0 ?
(table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
- ret.setAttribute(ATTRIBUTE_DB, dbId);
+ AtlasRelatedObjectId dbRelatedObject = new
AtlasRelatedObjectId(dbId, RELATIONSHIP_HIVE_TABLE_DB);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_DB, dbRelatedObject );
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME,
table.getTableName().toLowerCase());
ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
@@ -417,8 +456,10 @@ public abstract class BaseHiveEvent {
} else {
AtlasObjectId tableId = getObjectId(ret);
AtlasEntity sd =
getStorageDescEntity(tableId, table);
- List<AtlasEntity> partitionKeys =
getColumnEntities(tableId, table, table.getPartitionKeys());
- List<AtlasEntity> columns =
getColumnEntities(tableId, table, table.getCols());
+ List<AtlasEntity> partitionKeys =
getColumnEntities(tableId, table, table.getPartitionKeys(),
RELATIONSHIP_HIVE_TABLE_PART_KEYS);
+ List<AtlasEntity> columns =
getColumnEntities(tableId, table, table.getCols(),
RELATIONSHIP_HIVE_TABLE_COLUMNS);
+
+
if (entityExtInfo != null) {
entityExtInfo.addReferredEntity(sd);
@@ -436,9 +477,10 @@ public abstract class BaseHiveEvent {
}
}
- ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
- ret.setAttribute(ATTRIBUTE_PARTITION_KEYS,
getObjectIds(partitionKeys));
- ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
+
+ ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC,
getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+ ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS,
getObjectIdsWithRelationshipType(partitionKeys,
RELATIONSHIP_HIVE_TABLE_PART_KEYS));
+ ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS,
getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
}
context.putEntity(tblQualifiedName, ret);
@@ -466,7 +508,9 @@ public abstract class BaseHiveEvent {
StorageDescriptor sd = table.getSd();
- ret.setAttribute(ATTRIBUTE_TABLE, tableId);
+ AtlasRelatedObjectId tableRelatedObject = new
AtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_TABLE, tableRelatedObject);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
ret.setAttribute(ATTRIBUTE_PARAMETERS, sd.getParameters());
ret.setAttribute(ATTRIBUTE_LOCATION,
HdfsNameServiceResolver.getPathWithNameServiceID(sd.getLocation()));
@@ -512,7 +556,7 @@ public abstract class BaseHiveEvent {
return ret;
}
- protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table
table, List<FieldSchema> fieldSchemas) {
+ protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table
table, List<FieldSchema> fieldSchemas, String relationshipType) {
List<AtlasEntity> ret = new ArrayList<>();
boolean isKnownTable = tableId.getGuid() == null;
int columnPosition = 0;
@@ -531,8 +575,8 @@ public abstract class BaseHiveEvent {
if (isKnownTable) {
column.setGuid(null);
}
-
- column.setAttribute(ATTRIBUTE_TABLE, tableId);
+ AtlasRelatedObjectId relatedObjectId = new
AtlasRelatedObjectId(tableId, relationshipType);
+ column.setRelationshipAttribute(ATTRIBUTE_TABLE,
(relatedObjectId));
column.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
colQualifiedName);
column.setAttribute(ATTRIBUTE_NAME, fieldSchema.getName());
column.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
@@ -580,7 +624,7 @@ public abstract class BaseHiveEvent {
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
- ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity));
+ ret.setRelationshipAttribute(ATTRIBUTE_BUCKET,
getObjectIdWithRelationshipType(bucketEntity,
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
@@ -628,8 +672,8 @@ public abstract class BaseHiveEvent {
}
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs,
outputs));
- ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs));
- ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs));
+ ret.setRelationshipAttribute(ATTRIBUTE_INPUTS,
getObjectIdsWithRelationshipType(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
+ ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS,
getObjectIdsWithRelationshipType(outputs,
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
@@ -665,8 +709,9 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId());
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
- ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName());
- ret.setRelationshipAttribute(ATTRIBUTE_PROCESS,
AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess));
+ ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName()); //
+ AtlasRelatedObjectId hiveProcessRelationObjectId =
AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess,
RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE);
+ ret.setRelationshipAttribute(ATTRIBUTE_PROCESS,
hiveProcessRelationObjectId);
return ret;
}
@@ -681,11 +726,16 @@ public abstract class BaseHiveEvent {
if (excludeEntityGuid) {
objId.setGuid(null);
}
+ AtlasRelatedObjectId objIdRelatedObject = new
AtlasRelatedObjectId(objId);
if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_DB)) {
- hiveDDL = new AtlasEntity(HIVE_DB_DDL, ATTRIBUTE_DB, objId);
+ hiveDDL = new AtlasEntity(HIVE_DB_DDL);
+
objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_DB_DDL_QUERIES);
+ hiveDDL.setRelationshipAttribute(ATTRIBUTE_DB, objIdRelatedObject);
} else if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_TABLE)) {
- hiveDDL = new AtlasEntity(HIVE_TABLE_DDL, ATTRIBUTE_TABLE, objId);
+ hiveDDL = new AtlasEntity(HIVE_TABLE_DDL);
+
objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_TABLE_DDL_QUERIES);
+ hiveDDL.setRelationshipAttribute( ATTRIBUTE_TABLE,
objIdRelatedObject);
}
if (hiveDDL != null) {
@@ -945,7 +995,10 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName);
ret.setAttribute(ATTRIBUTE_URI, hbaseTableName);
- ret.setAttribute(ATTRIBUTE_NAMESPACE, getObjectId(nsEntity));
+
+ AtlasRelatedObjectId objIdRelatedObject = new
AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE,
objIdRelatedObject);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName));
entities.addReferredEntity(nsEntity);
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 7791fb4..91e063e 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -207,9 +207,9 @@ public class CreateHiveProcess extends BaseHiveEvent {
columnLineageProcess.setAttribute(ATTRIBUTE_NAME,
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" +
outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" +
outputColumn.getAttribute(ATTRIBUTE_NAME));
- columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS,
getObjectIds(inputColumns));
- columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS,
Collections.singletonList(getObjectId(outputColumn)));
- columnLineageProcess.setAttribute(ATTRIBUTE_QUERY,
getObjectId(hiveProcess));
+ columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS,
getObjectIdsWithRelationshipType(inputColumns,
BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS));
+ columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS,
Collections.singletonList(getObjectIdWithRelationshipType(outputColumn,
BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
+ columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY,
getObjectIdWithRelationshipType(hiveProcess,
BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE,
entry.getValue().getType());
columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION,
entry.getValue().getExpr());
diff --git
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
index 7c57ccf..ae6932d 100644
---
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
+++
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
@@ -83,6 +83,12 @@ public class AtlasRelatedObjectId extends AtlasObjectId
implements Serializable
super(other);
}
+ public AtlasRelatedObjectId(AtlasObjectId objId, String relationshipType) {
+ this(objId);
+
+ setRelationshipType(relationshipType);
+ }
+
public AtlasRelatedObjectId(Map objIdMap) {
super(objIdMap);
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index d74c7e3..6ac176d 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -414,6 +414,10 @@ public class AtlasTypeUtil {
return new AtlasRelatedObjectId(getAtlasObjectId(entity));
}
+ public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity
entity, String relationshipType){
+ return new AtlasRelatedObjectId(getAtlasObjectId(entity),
relationshipType);
+ }
+
public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity
entity, AtlasTypeRegistry typeRegistry) {
return new AtlasRelatedObjectId(getAtlasObjectId(entity,
typeRegistry));
}