This is an automated email from the ASF dual-hosted git repository.
sidmishra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new d7547c4 ATLAS-4400: Fixed Hook and Atlas Preprocessor to handle S3 V2
directory objectPrefix Issue with Atlas Server and Hook versions mismatch
d7547c4 is described below
commit d7547c493f25f3e1acc5572ce7159068d6b2d00e
Author: Sidharth Mishra <[email protected]>
AuthorDate: Wed Aug 25 20:36:49 2021 -0700
ATLAS-4400: Fixed Hook and Atlas Preprocessor to handle S3 V2 directory
objectPrefix Issue with Atlas Server and Hook versions mismatch
Signed-off-by: sidmishra <[email protected]>
---
.../apache/atlas/utils/AtlasPathExtractorUtil.java | 2 +-
.../atlas/utils/AtlasPathExtractorUtilTest.java | 6 +-
.../notification/NotificationHookConsumer.java | 35 +++-
.../preprocessor/AWSS3V2Preprocessor.java | 91 +++++++++
.../preprocessor/EntityPreprocessor.java | 17 +-
.../preprocessor/PreprocessorContext.java | 8 +-
.../preprocessor/AWSS3V2PreprocessorTest.java | 214 +++++++++++++++++++++
7 files changed, 364 insertions(+), 9 deletions(-)
diff --git
a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
index a9f2e50..01a67b7 100644
--- a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
@@ -231,7 +231,7 @@ public class AtlasPathExtractorUtil {
ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
- ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath);
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
diff --git
a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
index 6bf5d57..f35e9ae 100644
---
a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
+++
b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
@@ -372,16 +372,16 @@ public class AtlasPathExtractorUtilTest {
if (pathQName.equalsIgnoreCase(entityQName)){
assertEquals(entity.getAttribute(ATTRIBUTE_NAME),
"Irradiance_A.csv");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/renders/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/renders/Irradiance_A.csv/");
} else {
pathQName = s3Scheme + "aws_my_bucket1/1234567890/" +
QNAME_METADATA_NAMESPACE;
if (pathQName.equalsIgnoreCase(entityQName)){
assertEquals(entity.getAttribute(ATTRIBUTE_NAME),
"1234567890");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/");
} else {
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME),
s3Scheme + "aws_my_bucket1/1234567890/renders/" + QNAME_METADATA_NAMESPACE);
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "renders");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/renders/");
}
}
}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 5643af9..49c504f 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -154,6 +154,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
public static final String
CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME =
"atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
public static final String
CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS =
"atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String
CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS =
"atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
+ public static final String
CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX =
"atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix";
public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER
= "atlas.notification.authorize.using.message.user";
public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS
= "atlas.notification.authorize.authn.cache.ttl.seconds";
@@ -182,6 +183,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
+ private final boolean s3V2DirectoryPruneObjectPrefix;
private final boolean preprocessEnabled;
private final boolean createShellEntityForNonExistingReference;
private final boolean authorizeUsingMessageUser;
@@ -310,12 +312,15 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
hiveTypesRemoveOwnedRefAttrs =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS,
true);
rdbmsTypesRemoveOwnedRefAttrs =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
true);
- preprocessEnabled = skipHiveColumnLineageHive20633 ||
updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs ||
rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() ||
!hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() ||
!hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
+ s3V2DirectoryPruneObjectPrefix =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX,
true);
+
+ preprocessEnabled = skipHiveColumnLineageHive20633 ||
updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs ||
rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix ||
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() ||
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() ||
!hiveTablePrefixesToIgnore.isEmpty();
entityCorrelationManager = new
EntityCorrelationManager(entityCorrelationStore);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633,
skipHiveColumnLineageHive20633);
LOG.info("{}={}",
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
skipHiveColumnLineageHive20633InputsThreshold);
LOG.info("{}={}",
CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS,
hiveTypesRemoveOwnedRefAttrs);
LOG.info("{}={}",
CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
rdbmsTypesRemoveOwnedRefAttrs);
+ LOG.info("{}={}",
CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX,
s3V2DirectoryPruneObjectPrefix);
LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize);
LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled);
}
@@ -982,7 +987,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
if (preprocessEnabled) {
context = new PreprocessorContext(kafkaMsg, typeRegistry,
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore,
hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
- rdbmsTypesRemoveOwnedRefAttrs,
updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
+ rdbmsTypesRemoveOwnedRefAttrs,
s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName,
entityCorrelationManager);
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
@@ -996,6 +1001,10 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
rdbmsTypeRemoveOwnedRefAttrs(context);
}
+ if (s3V2DirectoryPruneObjectPrefix) {
+ pruneObjectPrefixForS3V2Directory(context);
+ }
+
context.moveRegisteredReferredEntities();
if (context.isHivePreprocessEnabled() &&
CollectionUtils.isNotEmpty(context.getEntities()) &&
context.getEntities().size() > 1) {
@@ -1040,6 +1049,28 @@ public class NotificationHookConsumer implements
Service, ActiveStateChangeHandl
}
}
+ private void pruneObjectPrefixForS3V2Directory(PreprocessorContext
context) {
+ List<AtlasEntity> entities = new ArrayList<>();
+
+ if (CollectionUtils.isNotEmpty(context.getEntities())) {
+ entities.addAll(context.getEntities());
+ }
+
+ if (MapUtils.isNotEmpty(context.getReferredEntities())) {
+ entities.addAll(context.getReferredEntities().values());
+ }
+
+ if (CollectionUtils.isNotEmpty(entities)) {
+ for (AtlasEntity entity : entities) {
+ EntityPreprocessor preprocessor =
EntityPreprocessor.getS3V2Preprocessor(entity.getTypeName());
+
+ if (preprocessor != null) {
+ preprocessor.preprocess(entity, context);
+ }
+ }
+ }
+ }
+
private void preprocessHiveTypes(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java
new file mode 100644
index 0000000..6102572
--- /dev/null
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.Path;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AWSS3V2Preprocessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(AWSS3V2Preprocessor.class);
+
+ private static final String AWS_S3_V2_DIR_TYPE =
"aws_s3_v2_directory";
+ private static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
+ private static final String SCHEME_SEPARATOR = "://";
+
+ static class AWSS3V2DirectoryPreprocessor extends EntityPreprocessor {
+ protected AWSS3V2DirectoryPreprocessor() {
+ super(AWS_S3_V2_DIR_TYPE);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext
context) {
+ if (context.getS3V2DirectoryPruneObjectPrefix()) {
+ String qualifiedName = (String)
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ String objectPrefix = (String)
entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX);
+
+ if (isObjectPrefixPruneNeeded(qualifiedName, objectPrefix)) {
+ if (objectPrefix.lastIndexOf(Path.SEPARATOR) == -1) {
+ objectPrefix = Path.SEPARATOR;
+ } else {
+ if (doesEndsWithPathSeparator(objectPrefix)) {
+ objectPrefix =
removeLastPathSeparator(objectPrefix);
+ }
+
+ objectPrefix = objectPrefix.substring(0,
objectPrefix.lastIndexOf(Path.SEPARATOR) + 1);
+ }
+
+ LOG.info("Aws S3 V2 Preprocessor: Pruning {} from {} to
{}", ATTRIBUTE_OBJECT_PREFIX + QNAME_SEP_CLUSTER_NAME + AWS_S3_V2_DIR_TYPE,
+ entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
objectPrefix);
+
+ entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, objectPrefix);
+ }
+ }
+ }
+
+ private boolean isObjectPrefixPruneNeeded(String qualifiedName, String
objectPrefix) {
+ return (StringUtils.isNotBlank(qualifiedName)
+ && StringUtils.isNotBlank(objectPrefix)
+ &&
qualifiedName.contains(getSchemeAndBucket(qualifiedName) + objectPrefix +
QNAME_SEP_CLUSTER_NAME));
+ }
+
+ private String getSchemeAndBucket(String qualifiedName) {
+ String ret = "";
+
+ if (StringUtils.isNotEmpty(qualifiedName) &&
qualifiedName.contains(SCHEME_SEPARATOR)) {
+ int schemeSeparatorEndPosition =
qualifiedName.indexOf(SCHEME_SEPARATOR) + SCHEME_SEPARATOR.length();
+ int bucketEndPosition = qualifiedName.indexOf(Path.SEPARATOR,
schemeSeparatorEndPosition);
+ ret = qualifiedName.substring(0, bucketEndPosition);
+ }
+
+ return ret;
+ }
+
+ private boolean doesEndsWithPathSeparator(String path) {
+ return path.endsWith(Path.SEPARATOR);
+ }
+
+ private String removeLastPathSeparator(String path) {
+ return StringUtils.chop(path);
+ }
+ }
+
+}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 7f0cafe..f8eac4c 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -64,8 +64,9 @@ public abstract class EntityPreprocessor {
public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final String QNAME_SD_SUFFIX = "_storage";
- private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP
= new HashMap<>();
- private static final Map<String, EntityPreprocessor>
RDBMS_PREPROCESSOR_MAP = new HashMap<>();
+ private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP
= new HashMap<>();
+ private static final Map<String, EntityPreprocessor>
RDBMS_PREPROCESSOR_MAP = new HashMap<>();
+ private static final Map<String, EntityPreprocessor>
AWS_S3_V2_PREPROCESSOR_MAP = new HashMap<>();
private final String typeName;
@@ -88,6 +89,10 @@ public abstract class EntityPreprocessor {
new
RdbmsPreprocessor.RdbmsTablePreprocessor()
};
+ EntityPreprocessor[] s3V2Preprocessors = new EntityPreprocessor[] {
+ new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor()
+ };
+
for (EntityPreprocessor preprocessor : hivePreprocessors) {
HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(),
preprocessor);
}
@@ -95,6 +100,10 @@ public abstract class EntityPreprocessor {
for (EntityPreprocessor preprocessor : rdbmsPreprocessors) {
RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(),
preprocessor);
}
+
+ for (EntityPreprocessor preprocessor : s3V2Preprocessors) {
+ AWS_S3_V2_PREPROCESSOR_MAP.put(preprocessor.getTypeName(),
preprocessor);
+ }
}
protected EntityPreprocessor(String typeName) {
@@ -116,6 +125,10 @@ public abstract class EntityPreprocessor {
return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null;
}
+ public static EntityPreprocessor getS3V2Preprocessor(String typeName) {
+ return typeName != null ? AWS_S3_V2_PREPROCESSOR_MAP.get(typeName) :
null;
+ }
+
public static String getQualifiedName(AtlasEntity entity) {
Object obj = entity != null ?
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 59f6440..f930d9f 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -63,6 +63,7 @@ public class PreprocessorContext {
private final boolean
updateHiveProcessNameWithQualifiedName;
private final boolean
hiveTypesRemoveOwnedRefAttrs;
private final boolean
rdbmsTypesRemoveOwnedRefAttrs;
+ private final boolean
s3V2DirectoryPruneObjectPrefix;
private final boolean isHivePreProcessEnabled;
private final Set<String> ignoredEntities =
new HashSet<>();
private final Set<String> prunedEntities =
new HashSet<>();
@@ -73,7 +74,7 @@ public class PreprocessorContext {
private final EntityCorrelationManager correlationManager;
private List<AtlasEntity> postUpdateEntities =
null;
- public PreprocessorContext(AtlasKafkaMessage<HookNotification>
kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore,
List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache,
List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore,
List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs,
boolean rdbmsTypesRemoveOwnedRefAttrs, boolean
updateHiveProcessNameWithQualifiedName, EntityCorrelationMana [...]
+ public PreprocessorContext(AtlasKafkaMessage<HookNotification>
kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore,
List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache,
List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore,
List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs,
boolean rdbmsTypesRemoveOwnedRefAttrs, boolean s3V2DirectoryPruneObjectPrefix,
boolean updateHiveProcessName [...]
this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
@@ -84,6 +85,7 @@ public class PreprocessorContext {
this.hiveTablePrefixesToIgnore =
hiveTablePrefixesToIgnore;
this.hiveTypesRemoveOwnedRefAttrs =
hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs =
rdbmsTypesRemoveOwnedRefAttrs;
+ this.s3V2DirectoryPruneObjectPrefix =
s3V2DirectoryPruneObjectPrefix;
this.updateHiveProcessNameWithQualifiedName =
updateHiveProcessNameWithQualifiedName;
final HookNotification message = kafkaMessage.getMessage();
@@ -124,6 +126,10 @@ public class PreprocessorContext {
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return
rdbmsTypesRemoveOwnedRefAttrs; }
+ public boolean getS3V2DirectoryPruneObjectPrefix() {
+ return s3V2DirectoryPruneObjectPrefix;
+ }
+
public boolean isHivePreprocessEnabled() {
return isHivePreProcessEnabled;
}
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java
new file mode 100644
index 0000000..9c6c92a
--- /dev/null
+++
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
+import org.apache.atlas.utils.PathExtractorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+public class AWSS3V2PreprocessorTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AWSS3V2PreprocessorTest.class);
+ private static final String METADATA_NAMESPACE = "cm";
+ private static final String QNAME_METADATA_NAMESPACE = '@' +
METADATA_NAMESPACE;
+ private static final String AWS_S3_MODEL_VERSION_V2 = "V2";
+
+ private static final String SCHEME_SEPARATOR = "://";
+ private static final String S3_SCHEME = "s3" +
SCHEME_SEPARATOR;
+ private static final String S3A_SCHEME = "s3a" +
SCHEME_SEPARATOR;
+ private static final String ABFS_SCHEME = "abfs" +
SCHEME_SEPARATOR;
+
+ private static final String ATTRIBUTE_NAME = "name";
+ private static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
+ private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+
+ private static final List<String> EMPTY_STR_LIST = new
ArrayList<>();
+ private static final List<Pattern> EMPTY_PATTERN_LIST = new
ArrayList<>();
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForOtherTypes() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE);
+ final String ABFS_PATH = ABFS_SCHEME +
"[email protected]/tmp/cdp-demo/sample.csv";
+ Path path = new Path(ABFS_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity =
entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), null);
+
+ HookNotification hookNotification = new
HookNotification.EntityCreateRequestV2("test", new
AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new
AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new
PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST,
null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new
AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertNotEquals(entity.getTypeName(), preprocessor.getTypeName());
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForFullPath() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3A_SCHEME +
"aws_bucket1/1234567890/test/data1";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity =
entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/test/data1/");
+
+ HookNotification hookNotification = new
HookNotification.EntityCreateRequestV2("test", new
AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new
AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new
PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST,
null,
+
EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new
AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/1234567890/test/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME
+ "aws_bucket1/1234567890/test/data1/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "data1");
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForRootLevelDirectory() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3A_SCHEME +
"aws_bucket1/root1";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity =
entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/root1/");
+
+ HookNotification hookNotification = new
HookNotification.EntityCreateRequestV2("test", new
AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new
AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new
PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST,
null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new
AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME
+ "aws_bucket1/root1/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "root1");
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorWithSameDirNames() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3_SCHEME +
"temp/temp/temp/temp/";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity =
entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/temp/temp/temp/");
+
+ HookNotification hookNotification = new
HookNotification.EntityCreateRequestV2("test", new
AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new
AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new
PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST,
null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new
AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/temp/temp/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3_SCHEME
+ "temp/temp/temp/temp/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "temp");
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForHookWithCorrectObjectPrefix() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3A_SCHEME +
"aws_bucket1/root1";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity =
entityWithExtInfo.getEntity();
+
+ entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, "/");
+ assertNotEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/root1/");
+
+ HookNotification hookNotification = new
HookNotification.EntityCreateRequestV2("test", new
AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new
AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new
PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST,
null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new
AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME
+ "aws_bucket1/root1/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "root1");
+ }
+
+ @Test
+ public void
testS2V2DirectoryPreprocessorForHookWithCorrectObjectPrefixHavingSameDirNames()
{
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3A_SCHEME +
"temp/temp/temp/temp/";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity =
entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/temp/temp/temp/");
+ entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, "/temp/temp/");
+ assertNotEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/temp/temp/temp/");
+
+ HookNotification hookNotification = new
HookNotification.EntityCreateRequestV2("test", new
AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new
AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new
PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST,
null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new
AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX),
"/temp/temp/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME
+ "temp/temp/temp/temp/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "temp");
+ }
+}