This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 30a275d ATLAS-3659: updated Hive hook to create aws_s3_v2 entities
30a275d is described below
commit 30a275d4704ec82a3a860fd239274972ecec41af
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Sun Mar 8 18:09:19 2020 -0700
ATLAS-3659: updated Hive hook to create aws_s3_v2 entities
---
.../atlas/hive/hook/AtlasHiveHookContext.java | 4 +
.../java/org/apache/atlas/hive/hook/HiveHook.java | 6 +
.../atlas/hive/hook/events/BaseHiveEvent.java | 142 +++++++++++++++++----
3 files changed, 125 insertions(+), 27 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index 78290c0..d0b9393 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -168,6 +168,10 @@ public class AtlasHiveHookContext {
return hook.isConvertHdfsPathToLowerCase();
}
+ public boolean isAwsS3AtlasModelVersionV2() {
+ return hook.isAwsS3AtlasModelVersionV2();
+ }
+
public boolean getSkipHiveColumnLineageHive20633() {
return hook.getSkipHiveColumnLineageHive20633();
}
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index dd425cc..3aa5c3b 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -60,6 +60,8 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
public static final String HOOK_NAME_CACHE_DATABASE_COUNT =
CONF_PREFIX + "name.cache.database.count";
public static final String HOOK_NAME_CACHE_TABLE_COUNT =
CONF_PREFIX + "name.cache.table.count";
public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC =
CONF_PREFIX + "name.cache.rebuild.interval.seconds";
+ public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION =
CONF_PREFIX + "aws_s3.atlas.model.version";
+ public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
public static final String
HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES = CONF_PREFIX +
"hive_process.populate.deprecated.attributes";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633
= CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
public static final String
HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX +
"skip.hive_column_lineage.hive-20633.inputs.threshold";
@@ -75,6 +77,7 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
private static final int nameCacheDatabaseMaxCount;
private static final int nameCacheTableMaxCount;
private static final int nameCacheRebuildIntervalSeconds;
+ private static final boolean isAwsS3AtlasModelVersionV2;
private static final boolean
skipHiveColumnLineageHive20633;
private static final int
skipHiveColumnLineageHive20633InputsThreshold;
@@ -98,6 +101,7 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
nameCacheDatabaseMaxCount =
atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
nameCacheTableMaxCount =
atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
nameCacheRebuildIntervalSeconds =
atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60
minutes default
+ isAwsS3AtlasModelVersionV2 =
StringUtils.equalsIgnoreCase(atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION,
HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2), HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
skipHiveColumnLineageHive20633 =
atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold =
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
15); // skip if avg # of inputs is > 15
hiveProcessPopulateDeprecatedAttributes =
atlasProperties.getBoolean(HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES,
false);
@@ -253,6 +257,8 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
return convertHdfsPathToLowerCase;
}
+ public boolean isAwsS3AtlasModelVersionV2() { return
isAwsS3AtlasModelVersionV2; }
+
public boolean getSkipHiveColumnLineageHive20633() {
return skipHiveColumnLineageHive20633;
}
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 35d85f3..f45f240 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -79,6 +79,8 @@ public abstract class BaseHiveEvent {
public static final String AWS_S3_BUCKET =
"aws_s3_bucket";
public static final String AWS_S3_PSEUDO_DIR =
"aws_s3_pseudo_dir";
public static final String AWS_S3_OBJECT =
"aws_s3_object";
+ public static final String AWS_S3_V2_BUCKET =
"aws_s3_v2_bucket";
+ public static final String AWS_S3_V2_PSEUDO_DIR =
"aws_s3_v2_directory";
public static final String SCHEME_SEPARATOR = "://";
public static final String S3_SCHEME = "s3" +
SCHEME_SEPARATOR;
@@ -139,6 +141,7 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_NAMESPACE =
"namespace";
public static final String ATTRIBUTE_OBJECT_PREFIX =
"objectPrefix";
public static final String ATTRIBUTE_BUCKET = "bucket";
+ public static final String ATTRIBUTE_CONTAINER =
"container";
public static final String ATTRIBUTE_HOSTNAME =
"hostName";
public static final String ATTRIBUTE_EXEC_TIME =
"execTime";
public static final String ATTRIBUTE_DDL_QUERIES =
"ddlQueries";
@@ -160,6 +163,7 @@ public abstract class BaseHiveEvent {
public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS =
"hive_table_columns";
public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC =
"hive_table_storagedesc";
public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS =
"aws_s3_bucket_aws_s3_pseudo_dirs";
+ public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED =
"aws_s3_v2_container_contained";
public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE =
"hive_process_process_executions";
public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES =
"hive_db_ddl_queries";
public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES =
"hive_table_ddl_queries";
@@ -579,33 +583,10 @@ public abstract class BaseHiveEvent {
}
if (isS3Path(strPath)) {
- String bucketName = path.toUri().getAuthority();
- String bucketQualifiedName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + path.toUri().getAuthority() +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
- String pathQualifiedName = (strPath +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
- AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
-
- ret = context.getEntity(pathQualifiedName);
-
- if (ret == null) {
- if (bucketEntity == null) {
- bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
-
- bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
- bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
-
- context.putEntity(bucketQualifiedName, bucketEntity);
- }
-
- extInfo.addReferredEntity(bucketEntity);
-
- ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
-
- ret.setRelationshipAttribute(ATTRIBUTE_BUCKET,
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
- ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
- ret.setAttribute(ATTRIBUTE_NAME,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
-
- context.putEntity(pathQualifiedName, ret);
+ if (context.isAwsS3AtlasModelVersionV2()) {
+ ret = addS3PathEntityV2(path, strPath, extInfo);
+ } else {
+ ret = addS3PathEntityV1(path, strPath, extInfo);
}
} else {
String nameServiceID =
HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
@@ -1150,6 +1131,113 @@ public abstract class BaseHiveEvent {
return strPath != null && (strPath.startsWith(S3_SCHEME) ||
strPath.startsWith(S3A_SCHEME));
}
+ private AtlasEntity addS3PathEntityV1(Path path, String strPath,
AtlasEntityExtInfo extInfo) {
+ String metadataNamespace = getMetadataNamespace();
+ String bucketName = path.toUri().getAuthority();
+ String bucketQualifiedName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + path.toUri().getAuthority() +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+ String pathQualifiedName = (strPath +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+ AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ if (bucketEntity == null) {
+ bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
+
+ bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
+ bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+ context.putEntity(bucketQualifiedName, bucketEntity);
+ }
+
+ extInfo.addReferredEntity(bucketEntity);
+
+ ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_BUCKET,
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+
+ context.putEntity(pathQualifiedName, ret);
+ }
+
+ return ret;
+ }
+
+ private AtlasEntity addS3PathEntityV2(Path path, String strPath,
AtlasEntityExtInfo extInfo) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
+ }
+
+ String metadataNamespace = getMetadataNamespace();
+ String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ String bucketName = path.toUri().getAuthority();
+ String schemeAndBucketName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + bucketName).toLowerCase();
+ String bucketQualifiedName = schemeAndBucketName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
+
+ if (bucketEntity == null) {
+ bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
+
+ bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
+ bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
bucketEntity.getTypeName(),
bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(bucketQualifiedName, bucketEntity);
+ }
+
+ extInfo.addReferredEntity(bucketEntity);
+
+ AtlasRelatedObjectId parentObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
+ String parentPath = Path.SEPARATOR;
+ String dirPath = path.toUri().getPath();
+
+ if (StringUtils.isEmpty(dirPath)) {
+ dirPath = Path.SEPARATOR;
+ }
+
+ for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+ if (StringUtils.isEmpty(subDirName)) {
+ continue;
+ }
+
+ String subDirPath = parentPath + subDirName +
Path.SEPARATOR;
+ String subDirQualifiedName = schemeAndBucketName + subDirPath
+ QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+ ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(subDirQualifiedName, ret);
+
+ parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret,
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
+ parentPath = subDirPath;
+ }
+
+ if (ret == null) {
+ ret = bucketEntity;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
+ }
+
+ return ret;
+ }
static final class EntityComparator implements Comparator<Entity> {
@Override