This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new d485860 ATLAS-3708: Update Hive hook to create ADLS-Gen2 entities for
ABFS path references
d485860 is described below
commit d485860937e0fb40dfd69566754a637fb1c0c132
Author: Sarath Subramanian <[email protected]>
AuthorDate: Mon Apr 6 14:00:34 2020 -0700
ATLAS-3708: Update Hive hook to create ADLS-Gen2 entities for ABFS path
references
---
.../atlas/hive/hook/events/BaseHiveEvent.java | 173 ++++++++++++++++++---
1 file changed, 154 insertions(+), 19 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index f45f240..6986ba1 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -81,10 +81,16 @@ public abstract class BaseHiveEvent {
public static final String AWS_S3_OBJECT =
"aws_s3_object";
public static final String AWS_S3_V2_BUCKET =
"aws_s3_v2_bucket";
public static final String AWS_S3_V2_PSEUDO_DIR =
"aws_s3_v2_directory";
+ public static final String ADLS_GEN2_ACCOUNT =
"adls_gen2_account";
+ public static final String ADLS_GEN2_CONTAINER =
"adls_gen2_container";
+ public static final String ADLS_GEN2_DIRECTORY =
"adls_gen2_directory";
+ public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX =
".dfs.core.windows.net";
public static final String SCHEME_SEPARATOR = "://";
public static final String S3_SCHEME = "s3" +
SCHEME_SEPARATOR;
public static final String S3A_SCHEME = "s3a" +
SCHEME_SEPARATOR;
+ public static final String ABFS_SCHEME = "abfs" +
SCHEME_SEPARATOR;
+ public static final String ABFSS_SCHEME = "abfss" +
SCHEME_SEPARATOR;
public static final String ATTRIBUTE_QUALIFIED_NAME =
"qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
@@ -146,6 +152,8 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_EXEC_TIME =
"execTime";
public static final String ATTRIBUTE_DDL_QUERIES =
"ddlQueries";
public static final String ATTRIBUTE_SERVICE_TYPE =
"serviceType";
+ public static final String ATTRIBUTE_ACCOUNT = "account";
+ public static final String ATTRIBUTE_PARENT = "parent";
public static final String HBASE_STORAGE_HANDLER_CLASS =
"org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
@@ -155,19 +163,21 @@ public abstract class BaseHiveEvent {
public static final String HDFS_PATH_PREFIX = "hdfs://";
public static final String EMPTY_ATTRIBUTE_VALUE = "";
- public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS =
"dataset_process_inputs";
- public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS =
"process_dataset_outputs";
- public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE =
"hive_process_column_lineage";
- public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
- public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS =
"hive_table_partitionkeys";
- public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS =
"hive_table_columns";
- public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC =
"hive_table_storagedesc";
- public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS =
"aws_s3_bucket_aws_s3_pseudo_dirs";
+ public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS =
"dataset_process_inputs";
+ public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS =
"process_dataset_outputs";
+ public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE =
"hive_process_column_lineage";
+ public static final String RELATIONSHIP_HIVE_TABLE_DB =
"hive_table_db";
+ public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS =
"hive_table_partitionkeys";
+ public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS =
"hive_table_columns";
+ public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC =
"hive_table_storagedesc";
+ public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS =
"aws_s3_bucket_aws_s3_pseudo_dirs";
public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED =
"aws_s3_v2_container_contained";
- public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE =
"hive_process_process_executions";
- public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES =
"hive_db_ddl_queries";
- public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES =
"hive_table_ddl_queries";
- public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE =
"hbase_table_namespace";
+ public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE =
"hive_process_process_executions";
+ public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES =
"hive_db_ddl_queries";
+ public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES =
"hive_table_ddl_queries";
+ public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE =
"hbase_table_namespace";
+ public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS =
"adls_gen2_account_containers";
+ public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN =
"adls_gen2_parent_children";
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new
HashMap<>();
@@ -574,7 +584,7 @@ public abstract class BaseHiveEvent {
}
protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo)
{
- AtlasEntity ret;
+ AtlasEntity ret;
String strPath = path.toString();
String metadataNamespace = getMetadataNamespace();
@@ -588,6 +598,8 @@ public abstract class BaseHiveEvent {
} else {
ret = addS3PathEntityV1(path, strPath, extInfo);
}
+ } else if (isAbfsPath(strPath)) {
+ ret = addAbfsPathEntity(path, strPath, extInfo);
} else {
String nameServiceID =
HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
String attrPath = StringUtils.isEmpty(nameServiceID) ?
strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
@@ -621,25 +633,29 @@ public abstract class BaseHiveEvent {
}
protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs,
List<AtlasEntity> outputs) throws Exception {
- AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
- String queryStr = getQueryString();
+ AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
+ String queryStr = getQueryString();
+ String qualifiedName = getQualifiedName(inputs, outputs);
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
}
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
- String qualifiedName = getQualifiedName(inputs, outputs);
+
if (context.isMetastoreHook()) {
HiveOperation operation = context.getHiveOperation();
+
if (operation == HiveOperation.CREATETABLE || operation ==
HiveOperation.CREATETABLE_AS_SELECT) {
- AtlasEntity table = outputs.get(0);
- long createTime =
Long.valueOf((Long)table.getAttribute(ATTRIBUTE_CREATE_TIME));
- qualifiedName = (String)
table.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
+ AtlasEntity table = outputs.get(0);
+ long createTime =
Long.valueOf((Long)table.getAttribute(ATTRIBUTE_CREATE_TIME));
+ qualifiedName = (String)
table.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
+
ret.setAttribute(ATTRIBUTE_NAME, "dummyProcess:" +
UUID.randomUUID());
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE,
operation.getOperationName());
}
}
+
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, qualifiedName);
ret.setRelationshipAttribute(ATTRIBUTE_INPUTS,
AtlasTypeUtil.getAtlasRelatedObjectIds(inputs,
RELATIONSHIP_DATASET_PROCESS_INPUTS));
@@ -650,6 +666,7 @@ public abstract class BaseHiveEvent {
// mandatory attributes for hive process entity type.
ret.setAttribute(ATTRIBUTE_START_TIME, System.currentTimeMillis());
ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
+
if (context.isHiveProcessPopulateDeprecatedAttributes()) {
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
@@ -659,6 +676,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_QUERY_ID, EMPTY_ATTRIBUTE_VALUE);
}
+
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_RECENT_QUERIES,
Collections.singletonList(queryStr));
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getMetadataNamespace());
@@ -1131,6 +1149,10 @@ public abstract class BaseHiveEvent {
return strPath != null && (strPath.startsWith(S3_SCHEME) ||
strPath.startsWith(S3A_SCHEME));
}
+ private boolean isAbfsPath(String strPath) {
+ return strPath != null && (strPath.startsWith(ABFS_SCHEME) ||
strPath.startsWith(ABFSS_SCHEME));
+ }
+
private AtlasEntity addS3PathEntityV1(Path path, String strPath,
AtlasEntityExtInfo extInfo) {
String metadataNamespace = getMetadataNamespace();
String bucketName = path.toUri().getAuthority();
@@ -1239,6 +1261,119 @@ public abstract class BaseHiveEvent {
return ret;
}
+ private AtlasEntity addAbfsPathEntity(Path path, String strPath,
AtlasEntityExtInfo extInfo) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
+ }
+
+ String metadataNamespace = getMetadataNamespace();
+ String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ String abfsScheme = path.toUri().getScheme();
+ String storageAcctName =
getAbfsStorageAccountName(path.toUri());
+ String schemeAndStorageAcctName = (abfsScheme +
SCHEME_SEPARATOR + storageAcctName).toLowerCase();
+ String storageAcctQualifiedName = schemeAndStorageAcctName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ AtlasEntity storageAcctEntity =
context.getEntity(storageAcctQualifiedName);
+
+ // create adls-gen2 storage-account entity
+ if (storageAcctEntity == null) {
+ storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
+
+ storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
storageAcctQualifiedName);
+ storageAcctEntity.setAttribute(ATTRIBUTE_NAME,
storageAcctName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
storageAcctEntity.getTypeName(),
storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(storageAcctQualifiedName, storageAcctEntity);
+ }
+
+ extInfo.addReferredEntity(storageAcctEntity);
+
+ AtlasRelatedObjectId storageAcctObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity,
RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
+
+ // create adls-gen2 container entity linking to storage account
+ String containerName = path.toUri().getUserInfo();
+ String schemeAndContainerName = (abfsScheme +
SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE +
storageAcctName).toLowerCase();
+ String containerQualifiedName = schemeAndContainerName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ AtlasEntity containerEntity =
context.getEntity(containerQualifiedName);
+
+ if (containerEntity == null) {
+ containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
+
+ containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
containerQualifiedName);
+ containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
+ containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT,
storageAcctObjId);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
containerEntity.getTypeName(),
containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(containerQualifiedName, containerEntity);
+ }
+
+ extInfo.addReferredEntity(containerEntity);
+
+ // create adls-gen2 directory entity linking to container
+ AtlasRelatedObjectId parentObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity,
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
+ String parentPath = Path.SEPARATOR;
+ String dirPath = path.toUri().getPath();
+
+ if (StringUtils.isEmpty(dirPath)) {
+ dirPath = Path.SEPARATOR;
+ }
+
+ for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+ if (StringUtils.isEmpty(subDirName)) {
+ continue;
+ }
+
+ String subDirPath = parentPath + subDirName +
Path.SEPARATOR;
+ String subDirQualifiedName = schemeAndContainerName +
subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+ ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(subDirQualifiedName, ret);
+
+ parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret,
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
+ parentPath = subDirPath;
+ }
+
+ if (ret == null) {
+ ret = storageAcctEntity;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
+ }
+
+ return ret;
+ }
+
+ private String getAbfsStorageAccountName(URI uri) {
+ String ret = null;
+ String host = uri.getHost();
+
+ // host: "<account_name>.dfs.core.windows.net"
+ if (StringUtils.isNotEmpty(host) &&
host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
+ ret = host.substring(0,
host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
+ }
+
+ return ret;
+ }
+
static final class EntityComparator implements Comparator<Entity> {
@Override
public int compare(Entity entity1, Entity entity2) {