This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new d485860  ATLAS-3708: Update Hive hook to create ADLS-Gen2 entities for 
ABFS path references
d485860 is described below

commit d485860937e0fb40dfd69566754a637fb1c0c132
Author: Sarath Subramanian <[email protected]>
AuthorDate: Mon Apr 6 14:00:34 2020 -0700

    ATLAS-3708: Update Hive hook to create ADLS-Gen2 entities for ABFS path 
references
---
 .../atlas/hive/hook/events/BaseHiveEvent.java      | 173 ++++++++++++++++++---
 1 file changed, 154 insertions(+), 19 deletions(-)

diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index f45f240..6986ba1 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -81,10 +81,16 @@ public abstract class BaseHiveEvent {
     public static final String AWS_S3_OBJECT                       = 
"aws_s3_object";
     public static final String AWS_S3_V2_BUCKET                    = 
"aws_s3_v2_bucket";
     public static final String AWS_S3_V2_PSEUDO_DIR                = 
"aws_s3_v2_directory";
+    public static final String ADLS_GEN2_ACCOUNT                   = 
"adls_gen2_account";
+    public static final String ADLS_GEN2_CONTAINER                 = 
"adls_gen2_container";
+    public static final String ADLS_GEN2_DIRECTORY                 = 
"adls_gen2_directory";
+    public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX       = 
".dfs.core.windows.net";
 
     public static final String SCHEME_SEPARATOR                    = "://";
     public static final String S3_SCHEME                           = "s3" + 
SCHEME_SEPARATOR;
     public static final String S3A_SCHEME                          = "s3a" + 
SCHEME_SEPARATOR;
+    public static final String ABFS_SCHEME                         = "abfs" + 
SCHEME_SEPARATOR;
+    public static final String ABFSS_SCHEME                        = "abfss" + 
SCHEME_SEPARATOR;
 
     public static final String ATTRIBUTE_QUALIFIED_NAME            = 
"qualifiedName";
     public static final String ATTRIBUTE_NAME                      = "name";
@@ -146,6 +152,8 @@ public abstract class BaseHiveEvent {
     public static final String ATTRIBUTE_EXEC_TIME                 = 
"execTime";
     public static final String ATTRIBUTE_DDL_QUERIES               = 
"ddlQueries";
     public static final String ATTRIBUTE_SERVICE_TYPE              = 
"serviceType";
+    public static final String ATTRIBUTE_ACCOUNT                   = "account";
+    public static final String ATTRIBUTE_PARENT                    = "parent";
 
     public static final String HBASE_STORAGE_HANDLER_CLASS         = 
"org.apache.hadoop.hive.hbase.HBaseStorageHandler";
     public static final String HBASE_DEFAULT_NAMESPACE             = "default";
@@ -155,19 +163,21 @@ public abstract class BaseHiveEvent {
     public static final String HDFS_PATH_PREFIX                    = "hdfs://";
     public static final String EMPTY_ATTRIBUTE_VALUE = "";
 
-    public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = 
"dataset_process_inputs";
-    public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = 
"process_dataset_outputs";
-    public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE = 
"hive_process_column_lineage";
-    public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
-    public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS = 
"hive_table_partitionkeys";
-    public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = 
"hive_table_columns";
-    public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = 
"hive_table_storagedesc";
-    public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = 
"aws_s3_bucket_aws_s3_pseudo_dirs";
+    public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS        = 
"dataset_process_inputs";
+    public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS       = 
"process_dataset_outputs";
+    public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE   = 
"hive_process_column_lineage";
+    public static final String RELATIONSHIP_HIVE_TABLE_DB                 = 
"hive_table_db";
+    public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS          = 
"hive_table_partitionkeys";
+    public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS            = 
"hive_table_columns";
+    public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC       = 
"hive_table_storagedesc";
+    public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS  = 
"aws_s3_bucket_aws_s3_pseudo_dirs";
     public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = 
"aws_s3_v2_container_contained";
-    public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = 
"hive_process_process_executions";
-    public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = 
"hive_db_ddl_queries";
-    public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = 
"hive_table_ddl_queries";
-    public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = 
"hbase_table_namespace";
+    public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE      = 
"hive_process_process_executions";
+    public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES           = 
"hive_db_ddl_queries";
+    public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES        = 
"hive_table_ddl_queries";
+    public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE         = 
"hbase_table_namespace";
+    public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS  = 
"adls_gen2_account_containers";
+    public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN     = 
"adls_gen2_parent_children";
 
 
     public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new 
HashMap<>();
@@ -574,7 +584,7 @@ public abstract class BaseHiveEvent {
     }
 
     protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) 
{
-        AtlasEntity ret;
+            AtlasEntity ret;
         String      strPath           = path.toString();
         String      metadataNamespace = getMetadataNamespace();
 
@@ -588,6 +598,8 @@ public abstract class BaseHiveEvent {
             } else {
                 ret = addS3PathEntityV1(path, strPath, extInfo);
             }
+        } else if (isAbfsPath(strPath)) {
+            ret = addAbfsPathEntity(path, strPath, extInfo);
         } else {
             String nameServiceID     = 
HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
             String attrPath          = StringUtils.isEmpty(nameServiceID) ? 
strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
@@ -621,25 +633,29 @@ public abstract class BaseHiveEvent {
     }
 
     protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, 
List<AtlasEntity> outputs) throws Exception {
-        AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS);
-        String queryStr = getQueryString();
+        AtlasEntity ret           = new AtlasEntity(HIVE_TYPE_PROCESS);
+        String      queryStr      = getQueryString();
+        String      qualifiedName = getQualifiedName(inputs, outputs);
 
         if (queryStr != null) {
             queryStr = queryStr.toLowerCase().trim();
         }
 
         ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
-        String qualifiedName = getQualifiedName(inputs, outputs);
+
         if (context.isMetastoreHook()) {
             HiveOperation operation = context.getHiveOperation();
+
             if (operation == HiveOperation.CREATETABLE || operation == 
HiveOperation.CREATETABLE_AS_SELECT) {
-                AtlasEntity table = outputs.get(0);
-                long createTime = 
Long.valueOf((Long)table.getAttribute(ATTRIBUTE_CREATE_TIME));
-                qualifiedName =  (String) 
table.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
+                AtlasEntity table      = outputs.get(0);
+                long        createTime = 
Long.valueOf((Long)table.getAttribute(ATTRIBUTE_CREATE_TIME));
+                qualifiedName          =  (String) 
table.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
+
                 ret.setAttribute(ATTRIBUTE_NAME, "dummyProcess:" + 
UUID.randomUUID());
                 ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, 
operation.getOperationName());
             }
         }
+
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
         ret.setAttribute(ATTRIBUTE_NAME, qualifiedName);
         ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, 
AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, 
RELATIONSHIP_DATASET_PROCESS_INPUTS));
@@ -650,6 +666,7 @@ public abstract class BaseHiveEvent {
         // mandatory attributes for hive process entity type.
         ret.setAttribute(ATTRIBUTE_START_TIME, System.currentTimeMillis());
         ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
+
         if (context.isHiveProcessPopulateDeprecatedAttributes()) {
             ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
             ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
@@ -659,6 +676,7 @@ public abstract class BaseHiveEvent {
             ret.setAttribute(ATTRIBUTE_QUERY_TEXT, EMPTY_ATTRIBUTE_VALUE);
             ret.setAttribute(ATTRIBUTE_QUERY_ID, EMPTY_ATTRIBUTE_VALUE);
         }
+
         ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
         ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, 
Collections.singletonList(queryStr));
         ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getMetadataNamespace());
@@ -1131,6 +1149,10 @@ public abstract class BaseHiveEvent {
         return strPath != null && (strPath.startsWith(S3_SCHEME) || 
strPath.startsWith(S3A_SCHEME));
     }
 
+    private boolean isAbfsPath(String strPath) {
+        return strPath != null && (strPath.startsWith(ABFS_SCHEME) || 
strPath.startsWith(ABFSS_SCHEME));
+    }
+
     private AtlasEntity addS3PathEntityV1(Path path, String strPath, 
AtlasEntityExtInfo extInfo) {
         String      metadataNamespace   = getMetadataNamespace();
         String      bucketName          = path.toUri().getAuthority();
@@ -1239,6 +1261,119 @@ public abstract class BaseHiveEvent {
         return ret;
     }
 
+    private AtlasEntity addAbfsPathEntity(Path path, String strPath, 
AtlasEntityExtInfo extInfo) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
+        }
+
+        String      metadataNamespace = getMetadataNamespace();
+        String      pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE 
+ metadataNamespace;
+        AtlasEntity ret               = context.getEntity(pathQualifiedName);
+
+        if (ret == null) {
+            String      abfsScheme               = path.toUri().getScheme();
+            String      storageAcctName          = 
getAbfsStorageAccountName(path.toUri());
+            String      schemeAndStorageAcctName = (abfsScheme + 
SCHEME_SEPARATOR + storageAcctName).toLowerCase();
+            String      storageAcctQualifiedName = schemeAndStorageAcctName + 
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+            AtlasEntity storageAcctEntity        = 
context.getEntity(storageAcctQualifiedName);
+
+            // create adls-gen2 storage-account entity
+            if (storageAcctEntity == null) {
+                storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
+
+                storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
storageAcctQualifiedName);
+                storageAcctEntity.setAttribute(ATTRIBUTE_NAME, 
storageAcctName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
storageAcctEntity.getTypeName(), 
storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(storageAcctQualifiedName, storageAcctEntity);
+            }
+
+            extInfo.addReferredEntity(storageAcctEntity);
+
+            AtlasRelatedObjectId storageAcctObjId = 
AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity, 
RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
+
+            // create adls-gen2 container entity linking to storage account
+            String      containerName          = path.toUri().getUserInfo();
+            String      schemeAndContainerName = (abfsScheme + 
SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE + 
storageAcctName).toLowerCase();
+            String      containerQualifiedName = schemeAndContainerName + 
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+            AtlasEntity containerEntity        = 
context.getEntity(containerQualifiedName);
+
+            if (containerEntity == null) {
+                containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
+
+                containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
containerQualifiedName);
+                containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
+                containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT, 
storageAcctObjId);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
containerEntity.getTypeName(), 
containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(containerQualifiedName, containerEntity);
+            }
+
+            extInfo.addReferredEntity(containerEntity);
+
+            // create adls-gen2 directory entity linking to container
+            AtlasRelatedObjectId parentObjId = 
AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity, 
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
+            String               parentPath  = Path.SEPARATOR;
+            String               dirPath     = path.toUri().getPath();
+
+            if (StringUtils.isEmpty(dirPath)) {
+                dirPath = Path.SEPARATOR;
+            }
+
+            for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+                if (StringUtils.isEmpty(subDirName)) {
+                    continue;
+                }
+
+                String subDirPath          = parentPath + subDirName + 
Path.SEPARATOR;
+                String subDirQualifiedName = schemeAndContainerName + 
subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+                ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
+
+                ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
subDirQualifiedName);
+                ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(subDirQualifiedName, ret);
+
+                parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, 
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
+                parentPath  = subDirPath;
+            }
+
+            if (ret == null) {
+                ret = storageAcctEntity;
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
+        }
+
+        return ret;
+    }
+
+    private String getAbfsStorageAccountName(URI uri) {
+        String ret  = null;
+        String host = uri.getHost();
+
+        // host: "<account_name>.dfs.core.windows.net"
+        if (StringUtils.isNotEmpty(host) && 
host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
+            ret = host.substring(0, 
host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
+        }
+
+        return ret;
+    }
+
     static final class EntityComparator implements Comparator<Entity> {
         @Override
         public int compare(Entity entity1, Entity entity2) {

Reply via email to