This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 32b25aa ATLAS-4230: Add support for Google Cloud Storage Path Entity
creation
32b25aa is described below
commit 32b25aa6e3f1751c302b2a741b771e5d102316ef
Author: sidmishra <[email protected]>
AuthorDate: Tue Mar 30 16:20:40 2021 -0700
ATLAS-4230: Add support for Google Cloud Storage Path Entity creation
Signed-off-by: Sarath Subramanian <[email protected]>
(cherry picked from commit c03ee72ca557eab49f2e315fc17aefcce03176ef)
---
.../apache/atlas/utils/AtlasPathExtractorUtil.java | 93 +++++++++++++++++++++-
.../atlas/utils/AtlasPathExtractorUtilTest.java | 66 ++++++++++++++-
2 files changed, 155 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
index 81f847e..a9f2e50 100644
--- a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
@@ -82,6 +82,13 @@ public class AtlasPathExtractorUtil {
public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET =
"ozone_volume_buckets";
public static final String RELATIONSHIP_OZONE_PARENT_CHILDREN =
"ozone_parent_children";
+ //Google Cloud Storage
+ public static final String GCS_SCHEME = "gs" +
SCHEME_SEPARATOR;
+ public static final String GCS_BUCKET =
"gcp_storage_bucket";
+ public static final String GCS_VIRTUAL_DIR =
"gcp_storage_virtual_directory";
+ public static final String ATTRIBUTE_GCS_PARENT = "parent";
+ public static final String RELATIONSHIP_GCS_PARENT_CHILDREN =
"gcp_storage_parent_children";
+
public static AtlasEntityWithExtInfo getPathEntity(Path path,
PathExtractorContext context) {
AtlasEntityWithExtInfo entityWithExtInfo = new
AtlasEntityWithExtInfo();
AtlasEntity ret;
@@ -98,9 +105,12 @@ public class AtlasPathExtractorUtil {
ret = addAbfsPathEntity(path, entityWithExtInfo, context);
} else if (isOzonePath(strPath)) {
ret = addOzonePathEntity(path, entityWithExtInfo, context);
+ } else if (isGCSPath(strPath)) {
+ ret = addGCSPathEntity(path, entityWithExtInfo, context);
} else {
ret = addHDFSPathEntity(path, context);
}
+
entityWithExtInfo.setEntity(ret);
return entityWithExtInfo;
@@ -123,6 +133,10 @@ public class AtlasPathExtractorUtil {
return strPath != null && (strPath.startsWith(OZONE_SCHEME) ||
strPath.startsWith(OZONE_3_SCHEME));
}
+ private static boolean isGCSPath(String strPath) {
+ return strPath != null && strPath.startsWith(GCS_SCHEME);
+ }
+
private static AtlasEntity addS3PathEntityV1(Path path, AtlasEntityExtInfo
extInfo, PathExtractorContext context) {
String strPath = path.toString();
@@ -217,7 +231,7 @@ public class AtlasPathExtractorUtil {
ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
- ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
@@ -442,6 +456,83 @@ public class AtlasPathExtractorUtil {
return ret;
}
+ private static AtlasEntity addGCSPathEntity(Path path, AtlasEntityExtInfo
extInfo, PathExtractorContext context) {
+ String strPath = path.toString();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addGCSPathEntity(strPath={})", strPath);
+ }
+
+ String metadataNamespace = context.getMetadataNamespace();
+ String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ String bucketName = path.toUri().getAuthority();
+ String schemeAndBucketName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + bucketName).toLowerCase();
+ String bucketQualifiedName = schemeAndBucketName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
+
+ if (bucketEntity == null) {
+ bucketEntity = new AtlasEntity(GCS_BUCKET);
+
+ bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
+ bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
bucketEntity.getTypeName(),
bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(bucketQualifiedName, bucketEntity);
+ }
+
+ extInfo.addReferredEntity(bucketEntity);
+
+ AtlasRelatedObjectId parentObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_GCS_PARENT_CHILDREN);
+ String parentPath = Path.SEPARATOR;
+ String dirPath = path.toUri().getPath();
+
+ if (StringUtils.isEmpty(dirPath)) {
+ dirPath = Path.SEPARATOR;
+ }
+
+ for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+ if (StringUtils.isEmpty(subDirName)) {
+ continue;
+ }
+
+ String subDirPath = parentPath + subDirName +
Path.SEPARATOR;
+ String subDirQualifiedName = schemeAndBucketName + subDirPath
+ QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+ ret = new AtlasEntity(GCS_VIRTUAL_DIR);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_GCS_PARENT,
parentObjId);
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(subDirQualifiedName, ret);
+
+ parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret,
RELATIONSHIP_GCS_PARENT_CHILDREN);
+ parentPath = subDirPath;
+ }
+
+ if (ret == null) {
+ ret = bucketEntity;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== addGCSPathEntity(strPath={})", strPath);
+ }
+
+ return ret;
+ }
+
private static AtlasEntity addHDFSPathEntity(Path path,
PathExtractorContext context) {
String strPath = path.toString();
diff --git
a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
index dbc5000..6bf5d57 100644
---
a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
+++
b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
@@ -78,6 +78,12 @@ public class AtlasPathExtractorUtilTest {
private static final String S3_PATH = S3_SCHEME +
"aws_my_bucket1/1234567890/renders/Irradiance_A.csv";
private static final String S3A_PATH = S3A_SCHEME
+ "aws_my_bucket1/1234567890/renders/Irradiance_A.csv";
+ // Google Cloud Storage
+ private static final String GCS_VIRTUAL_DIR =
"gcp_storage_virtual_directory";
+ private static final String GCS_BUCKET =
"gcp_storage_bucket";
+ private static final String GCS_SCHEME = "gs" +
SCHEME_SEPARATOR;
+ private static final String GCS_PATH = GCS_SCHEME +
"gcs_test_bucket1/1234567890/data";
+
@DataProvider(name = "ozonePathProvider")
private Object[][] ozonePathProvider(){
return new Object[][]{
@@ -264,6 +270,22 @@ public class AtlasPathExtractorUtilTest {
verifyS3KnownEntities(S3A_SCHEME, S3A_PATH,
extractorContext.getKnownEntities());
}
+ @Test
+ public void testGetPathEntityGCSPath() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE);
+
+ Path path = new Path(GCS_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertNotNull(entity);
+ assertEquals(entity.getTypeName(), GCS_VIRTUAL_DIR);
+ assertEquals(entityWithExtInfo.getReferredEntities().size(), 1);
+
+ verifyGCSVirtualDir(GCS_SCHEME, GCS_PATH, entity);
+ verifyGCSKnownEntities(GCS_SCHEME, GCS_PATH,
extractorContext.getKnownEntities());
+ }
+
private void verifyOzoneEntities(Map<String, AtlasEntity> knownEntities,
OzoneKeyValidator validator) {
for (AtlasEntity knownEntity : knownEntities.values()) {
switch (knownEntity.getTypeName()){
@@ -350,20 +372,35 @@ public class AtlasPathExtractorUtilTest {
if (pathQName.equalsIgnoreCase(entityQName)){
assertEquals(entity.getAttribute(ATTRIBUTE_NAME),
"Irradiance_A.csv");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/renders/Irradiance_A.csv/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/renders/");
} else {
pathQName = s3Scheme + "aws_my_bucket1/1234567890/" +
QNAME_METADATA_NAMESPACE;
if (pathQName.equalsIgnoreCase(entityQName)){
assertEquals(entity.getAttribute(ATTRIBUTE_NAME),
"1234567890");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/");
} else {
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME),
s3Scheme + "aws_my_bucket1/1234567890/renders/" + QNAME_METADATA_NAMESPACE);
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "renders");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/renders/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/");
}
}
}
+ private void verifyGCSVirtualDir(String s3Scheme, String path, AtlasEntity
entity) {
+ String pathQName = path + "/" + QNAME_METADATA_NAMESPACE;
+ String entityQName = (String)
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+
+ if (pathQName.equalsIgnoreCase(entityQName)){
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "data");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/");
+ } else {
+ pathQName = s3Scheme + "gcs_test_bucket1/1234567890/" +
QNAME_METADATA_NAMESPACE;
+ assertEquals(entityQName, pathQName);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "1234567890");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+ }
+ }
+
private void verifyS3V2KnownEntities(String scheme, String path,
Map<String, AtlasEntity> knownEntities) {
assertEquals(knownEntities.size(), 4);
int dirCount = 0;
@@ -411,6 +448,29 @@ public class AtlasPathExtractorUtilTest {
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "aws_my_bucket1");
}
+ private void verifyGCSKnownEntities(String scheme, String path,
Map<String, AtlasEntity> knownEntities) {
+ assertEquals(knownEntities.size(), 3);
+ int dirCount = 0;
+ for (AtlasEntity knownEntity : knownEntities.values()) {
+ switch (knownEntity.getTypeName()){
+ case GCS_VIRTUAL_DIR:
+ verifyGCSVirtualDir(scheme, path, knownEntity);
+ dirCount++;
+ break;
+
+ case GCS_BUCKET:
+ verifyGCSBucketEntity(scheme, knownEntity);
+ break;
+ }
+ }
+ assertEquals(dirCount, 2);
+ }
+
+ private void verifyGCSBucketEntity(String scheme, AtlasEntity entity) {
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme +
"gcs_test_bucket1" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "gcs_test_bucket1");
+ }
+
private class OzoneKeyValidator {
private final String scheme;
private final String location;