This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit f01f49800384aff9d64cdc743c962afb92651fd9 Author: Madhan Neethiraj <[email protected]> AuthorDate: Sun Mar 8 18:09:19 2020 -0700 ATLAS-3659: updated Hive hook to create aws_s3_v2 entities (cherry picked from commit 30a275d4704ec82a3a860fd239274972ecec41af) --- .../atlas/hive/hook/AtlasHiveHookContext.java | 4 + .../java/org/apache/atlas/hive/hook/HiveHook.java | 6 + .../atlas/hive/hook/events/BaseHiveEvent.java | 142 +++++++++++++++++---- 3 files changed, 125 insertions(+), 27 deletions(-) diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index 78290c0..d0b9393 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -168,6 +168,10 @@ public class AtlasHiveHookContext { return hook.isConvertHdfsPathToLowerCase(); } + public boolean isAwsS3AtlasModelVersionV2() { + return hook.isAwsS3AtlasModelVersionV2(); + } + public boolean getSkipHiveColumnLineageHive20633() { return hook.getSkipHiveColumnLineageHive20633(); } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index dd425cc..3aa5c3b 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -60,6 +60,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count"; public static final String HOOK_NAME_CACHE_TABLE_COUNT = CONF_PREFIX + "name.cache.table.count"; public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds"; + public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION = CONF_PREFIX + "aws_s3.atlas.model.version"; + public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2"; public static final String HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES = CONF_PREFIX + "hive_process.populate.deprecated.attributes"; public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = CONF_PREFIX + "skip.hive_column_lineage.hive-20633"; public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold"; @@ -75,6 +77,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final int nameCacheDatabaseMaxCount; private static final int nameCacheTableMaxCount; private static final int nameCacheRebuildIntervalSeconds; + private static final boolean isAwsS3AtlasModelVersionV2; private static final boolean skipHiveColumnLineageHive20633; private static final int skipHiveColumnLineageHive20633InputsThreshold; @@ -98,6 +101,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000); nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000); nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default + isAwsS3AtlasModelVersionV2 = StringUtils.equalsIgnoreCase(atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2), HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2); skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 hiveProcessPopulateDeprecatedAttributes = atlasProperties.getBoolean(HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES, false); @@ -253,6 +257,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return convertHdfsPathToLowerCase; } + public boolean isAwsS3AtlasModelVersionV2() { return isAwsS3AtlasModelVersionV2; } + public boolean getSkipHiveColumnLineageHive20633() { return skipHiveColumnLineageHive20633; } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index 86bf7a0..b766742 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -82,6 +82,8 @@ public abstract class BaseHiveEvent { public static final String AWS_S3_BUCKET = "aws_s3_bucket"; public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir"; public static final String AWS_S3_OBJECT = "aws_s3_object"; + public static final String AWS_S3_V2_BUCKET = "aws_s3_v2_bucket"; + public static final String AWS_S3_V2_PSEUDO_DIR = "aws_s3_v2_directory"; public static final String SCHEME_SEPARATOR = "://"; public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR; @@ -142,6 +144,7 @@ public abstract class BaseHiveEvent { public static final String ATTRIBUTE_NAMESPACE = "namespace"; public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix"; public static final String ATTRIBUTE_BUCKET = "bucket"; + public static final String ATTRIBUTE_CONTAINER = "container"; public static final String ATTRIBUTE_HOSTNAME = "hostName"; public static final String ATTRIBUTE_EXEC_TIME = "execTime"; public static final String ATTRIBUTE_DDL_QUERIES = "ddlQueries"; @@ -163,6 +166,7 @@ public abstract class BaseHiveEvent { public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns"; public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc"; public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs"; + public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = "aws_s3_v2_container_contained"; public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions"; public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries"; public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries"; @@ -582,33 +586,10 @@ public abstract class BaseHiveEvent { } if (isS3Path(strPath)) { - String bucketName = path.toUri().getAuthority(); - String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; - String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; - AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); - - ret = context.getEntity(pathQualifiedName); - - if (ret == null) { - if (bucketEntity == null) { - bucketEntity = new AtlasEntity(AWS_S3_BUCKET); - - bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); - bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); - - context.putEntity(bucketQualifiedName, bucketEntity); - } - - extInfo.addReferredEntity(bucketEntity); - - ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); - - ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); - ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); - ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); - ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); - - context.putEntity(pathQualifiedName, ret); + if (context.isAwsS3AtlasModelVersionV2()) { + ret = addS3PathEntityV2(path, strPath, extInfo); + } else { + ret = addS3PathEntityV1(path, strPath, extInfo); } } else { String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath); @@ -1153,6 +1134,113 @@ public abstract class BaseHiveEvent { return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME)); } + private AtlasEntity addS3PathEntityV1(Path path, String strPath, AtlasEntityExtInfo extInfo) { + String metadataNamespace = getMetadataNamespace(); + String bucketName = path.toUri().getAuthority(); + String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + if (bucketEntity == null) { + bucketEntity = new AtlasEntity(AWS_S3_BUCKET); + + bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); + bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); + + context.putEntity(bucketQualifiedName, bucketEntity); + } + + extInfo.addReferredEntity(bucketEntity); + + ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); + + ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); + ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); + + context.putEntity(pathQualifiedName, ret); + } + + return ret; + } + + private AtlasEntity addS3PathEntityV2(Path path, String strPath, AtlasEntityExtInfo extInfo) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> addS3PathEntityV2(strPath={})", strPath); + } + + String metadataNamespace = getMetadataNamespace(); + String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + String bucketName = path.toUri().getAuthority(); + String schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase(); + String bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); + + if (bucketEntity == null) { + bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET); + + bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); + bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(bucketQualifiedName, bucketEntity); + } + + extInfo.addReferredEntity(bucketEntity); + + AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED); + String parentPath = Path.SEPARATOR; + String dirPath = path.toUri().getPath(); + + if (StringUtils.isEmpty(dirPath)) { + dirPath = Path.SEPARATOR; + } + + for (String subDirName : dirPath.split(Path.SEPARATOR)) { + if (StringUtils.isEmpty(subDirName)) { + continue; + } + + String subDirPath = parentPath + subDirName + Path.SEPARATOR; + String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + + ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR); + + ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId); + ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, subDirName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(subDirQualifiedName, ret); + + parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED); + parentPath = subDirPath; + } + + if (ret == null) { + ret = bucketEntity; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== addS3PathEntityV2(strPath={})", strPath); + } + + return ret; + } static final class EntityComparator implements Comparator<Entity> { @Override
