This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 84b561a  NIFI-8131: Support aws_s3_v2_directory in Atlas reporting task
84b561a is described below

commit 84b561ad33fa0d37a7e175c1dd03d2412dbffaea
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Sun Jan 10 16:41:19 2021 +0100

    NIFI-8131: Support aws_s3_v2_directory in Atlas reporting task
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4751.
---
 .../nifi/atlas/provenance/AnalysisContext.java     |   1 +
 .../atlas/provenance/StandardAnalysisContext.java  |   8 +-
 .../atlas/provenance/analyzer/AwsS3Directory.java  | 107 ++++++++++++++-------
 .../provenance/analyzer/AzureADLSDirectory.java    |  29 ++++--
 .../nifi/atlas/reporting/ReportLineageToAtlas.java |  23 ++++-
 ...ectory.java => AbstractTestAwsS3Directory.java} |  70 ++------------
 .../provenance/analyzer/TestAwsS3DirectoryV1.java  |  92 ++++++++++++++++++
 .../provenance/analyzer/TestAwsS3DirectoryV2.java  |  96 ++++++++++++++++++
 .../analyzer/TestAzureADLSDirectory.java           |   2 +-
 9 files changed, 322 insertions(+), 106 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
index 4562139..f592710 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
@@ -31,4 +31,5 @@ public interface AnalysisContext {
     ComputeLineageResult queryLineage(long eventId);
     ComputeLineageResult findParents(long eventId);
     ProvenanceEventRecord getProvenanceEvent(long eventId);
+    String getAwsS3ModelVersion();
 }
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
index 3ca9ff1..f889650 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
@@ -36,12 +36,14 @@ public class StandardAnalysisContext implements 
AnalysisContext {
     private final NiFiFlow nifiFlow;
     private final NamespaceResolver namespaceResolver;
     private final ProvenanceRepository provenanceRepository;
+    private final String awsS3ModelVersion;
 
     public StandardAnalysisContext(NiFiFlow nifiFlow, NamespaceResolver 
namespaceResolver,
-                                   ProvenanceRepository provenanceRepository) {
+                                   ProvenanceRepository provenanceRepository, 
String awsS3ModelVersion) {
         this.nifiFlow = nifiFlow;
         this.namespaceResolver = namespaceResolver;
         this.provenanceRepository = provenanceRepository;
+        this.awsS3ModelVersion = awsS3ModelVersion;
     }
 
     @Override
@@ -101,4 +103,8 @@ public class StandardAnalysisContext implements 
AnalysisContext {
         }
     }
 
+    @Override
+    public String getAwsS3ModelVersion() {
+        return awsS3ModelVersion;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java
index af683fd..4d58504 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java
@@ -16,54 +16,80 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
+import org.apache.atlas.utils.PathExtractorContext;
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 
-import java.net.URI;
+import java.util.Map;
 
-import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
-import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 
 /**
  * Analyze a transit URI as an AWS S3 directory (skipping the object name).
- * <li>qualifiedName=s3a://bucket/path@namespace (example: 
s3a://mybucket/mydir@ns1)
- * <li>name=/path (example: /mydir)
+ * The analyzer outputs a v1 or v2 AWS S3 directory entity depending on the 
'AWS S3 Model Version' property configured on the reporting task.
+ * <p>
+ * Atlas entity hierarchy v1: aws_s3_pseudo_dir -> aws_s3_bucket
+ * <p>aws_s3_pseudo_dir
+ * <ul>
+ *   <li>qualifiedName=s3a://bucket/path@namespace (example: 
s3a://mybucket/mydir1/mydir2@ns1)
+ *   <li>name=/path (example: /mydir1/mydir2)
+ * </ul>
+ * <p>aws_s3_bucket
+ * <ul>
+ *   <li>qualifiedName=s3a://bucket@namespace (example: s3a://mybucket@ns1)
+ *   <li>name=bucket (example: mybucket)
+ * </ul>
+ * <p>
+ * Atlas entity hierarchy v2: aws_s3_v2_directory -> aws_s3_v2_directory -> 
... -> aws_s3_v2_bucket
+ * <p>aws_s3_v2_directory
+ * <ul>
+ *   <li>qualifiedName=s3a://bucket/path/@namespace (example: 
s3a://mybucket/mydir1/mydir2/@ns1)
+ *   <li>name=directory (example: mydir2)
+ * </ul>
+ * <p>aws_s3_v2_bucket
+ * <ul>
+ *   <li>qualifiedName=s3a://bucket@namespace (example: s3a://mybucket@ns1)
+ *   <li>name=bucket (example: mybucket)
+ * </ul>
  */
 public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer {
 
-    private static final String TYPE_DIRECTORY = "aws_s3_pseudo_dir";
-    private static final String TYPE_BUCKET = "aws_s3_bucket";
+    public static final String TYPE_DIRECTORY_V1 = 
AtlasPathExtractorUtil.AWS_S3_PSEUDO_DIR;
+    public static final String TYPE_BUCKET_V1 = 
AtlasPathExtractorUtil.AWS_S3_BUCKET;
+    public static final String ATTR_BUCKET_V1 = 
AtlasPathExtractorUtil.ATTRIBUTE_BUCKET;
+    public static final String ATTR_OBJECT_PREFIX_V1 = 
AtlasPathExtractorUtil.ATTRIBUTE_OBJECT_PREFIX;
 
-    public static final String ATTR_OBJECT_PREFIX = "objectPrefix";
-    public static final String ATTR_BUCKET = "bucket";
+    public static final String TYPE_DIRECTORY_V2 = 
AtlasPathExtractorUtil.AWS_S3_V2_PSEUDO_DIR;
+    public static final String TYPE_BUCKET_V2 = 
AtlasPathExtractorUtil.AWS_S3_V2_BUCKET;
+    public static final String ATTR_CONTAINER_V2 = 
AtlasPathExtractorUtil.ATTRIBUTE_CONTAINER;
+    public static final String ATTR_OBJECT_PREFIX_V2 = 
AtlasPathExtractorUtil.ATTRIBUTE_OBJECT_PREFIX;
 
     @Override
     public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord 
event) {
-        final String transitUri = event.getTransitUri();
+        String transitUri = event.getTransitUri();
         if (transitUri == null) {
             return null;
         }
 
-        final String directoryUri;
-        if (StringUtils.countMatches(transitUri, '/') > 3) {
-            // directory exists => drop last '/' and the file name
-            directoryUri = transitUri.substring(0, 
transitUri.lastIndexOf('/'));
-        } else {
-            // no directory => keep last '/', drop only the file name
-            directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') 
+ 1);
-        }
-        final URI uri = parseUri(directoryUri);
+        String directoryUri = transitUri.substring(0, 
transitUri.lastIndexOf('/') + 1);
+
+        Path path = new Path(directoryUri);
+
+        String namespace = 
context.getNamespaceResolver().fromHostNames(path.toUri().getHost());
 
-        final String namespace = 
context.getNamespaceResolver().fromHostNames(uri.getHost());
+        PathExtractorContext pathExtractorContext = new 
PathExtractorContext(namespace, context.getAwsS3ModelVersion());
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext);
 
-        final Referenceable ref = createDirectoryRef(uri, namespace);
+        Referenceable ref = 
convertToReferenceable(entityWithExtInfo.getEntity(), 
pathExtractorContext.getKnownEntities());
 
-        return singleDataSetRef(event.getComponentId(), event.getEventType(), 
ref);
+        return ref != null ? singleDataSetRef(event.getComponentId(), 
event.getEventType(), ref) : null;
     }
 
     @Override
@@ -71,23 +97,34 @@ public class AwsS3Directory extends 
AbstractNiFiProvenanceEventAnalyzer {
         return "^s3a://.+/.+$";
     }
 
-    private Referenceable createDirectoryRef(URI uri, String namespace) {
-        final Referenceable ref = new Referenceable(TYPE_DIRECTORY);
+    private Referenceable convertToReferenceable(AtlasEntity entity, 
Map<String, AtlasEntity> knownEntities) {
+        if (entity == null) {
+            return null;
+        }
 
-        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, 
uri.toString().toLowerCase()));
-        ref.set(ATTR_NAME, uri.getPath().toLowerCase());
-        ref.set(ATTR_OBJECT_PREFIX, uri.getPath().toLowerCase());
-        ref.set(ATTR_BUCKET, createBucketRef(uri, namespace));
+        Referenceable ref = createReferenceable(entity);
+
+        if (TYPE_DIRECTORY_V1.equals(entity.getTypeName())) {
+            AtlasObjectId bucketObjectId = (AtlasObjectId) 
entity.getRelationshipAttribute(ATTR_BUCKET_V1);
+            if (bucketObjectId != null) {
+                AtlasEntity bucketEntity = 
knownEntities.get(bucketObjectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME));
+                ref.set(ATTR_BUCKET_V1, convertToReferenceable(bucketEntity, 
knownEntities));
+            }
+        } else if (TYPE_DIRECTORY_V2.equals(entity.getTypeName())) {
+            AtlasObjectId containerObjectId = (AtlasObjectId) 
entity.getRelationshipAttribute(ATTR_CONTAINER_V2);
+            if (containerObjectId != null) {
+                AtlasEntity containerEntity = 
knownEntities.get(containerObjectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME));
+                ref.set(ATTR_CONTAINER_V2, 
convertToReferenceable(containerEntity, knownEntities));
+            }
+        }
 
         return ref;
     }
 
-    private Referenceable createBucketRef(URI uri, String namespace) {
-        final Referenceable ref = new Referenceable(TYPE_BUCKET);
-
-        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, 
String.format("%s://%s", uri.getScheme(), uri.getAuthority())));
-        ref.set(ATTR_NAME, uri.getAuthority());
-
+    private Referenceable createReferenceable(AtlasEntity entity) {
+        Referenceable ref = new Referenceable(entity.getTypeName());
+        ref.setValues(entity.getAttributes());
         return ref;
     }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java
index 2ced136..b4bfadb 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AzureADLSDirectory.java
@@ -34,17 +34,32 @@ import static 
org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 
 /**
  * Analyze a transit URI as an Azure ADLS Gen2 directory (skipping the file 
name).
- * <li>qualifiedName=abfs://filesystem@account/path@namespace (example: 
abfs://myfilesystem@myaccount/mydir1/mydir2@ns1)
- * <li>name=directory (example: mydir2)
+ * <p>
+ * Atlas entity hierarchy: adls_gen2_directory -> adls_gen2_directory -> ... 
-> adls_gen2_container -> adls_gen2_account
+ * <p>adls_gen2_directory
+ * <ul>
+ *   <li>qualifiedName=abfs://filesystem@account/path@namespace (example: 
abfs://myfilesystem@myaccount/mydir1/mydir2/@ns1)
+ *   <li>name=directory (example: mydir2)
+ * </ul>
+ * <p>adls_gen2_container
+ * <ul>
+ *   <li>qualifiedName=abfs://filesystem@account@namespace (example: 
abfs://myfilesystem@myaccount@ns1)
+ *   <li>name=filesystem (example: myfilesystem)
+ * </ul>
+ * <p>adls_gen2_account
+ * <ul>
+ *   <li>qualifiedName=abfs://account@namespace (example: abfs://myaccount@ns1)
+ *   <li>name=account (example: myaccount)
+ * </ul>
  */
 public class AzureADLSDirectory extends AbstractNiFiProvenanceEventAnalyzer {
 
-    public static final String TYPE_DIRECTORY = "adls_gen2_directory";
-    public static final String TYPE_CONTAINER = "adls_gen2_container";
-    public static final String TYPE_ACCOUNT = "adls_gen2_account";
+    public static final String TYPE_DIRECTORY = 
AtlasPathExtractorUtil.ADLS_GEN2_DIRECTORY;
+    public static final String TYPE_CONTAINER = 
AtlasPathExtractorUtil.ADLS_GEN2_CONTAINER;
+    public static final String TYPE_ACCOUNT = 
AtlasPathExtractorUtil.ADLS_GEN2_ACCOUNT;
 
-    public static final String ATTR_PARENT = "parent";
-    public static final String ATTR_ACCOUNT = "account";
+    public static final String ATTR_PARENT = 
AtlasPathExtractorUtil.ATTRIBUTE_PARENT;
+    public static final String ATTR_ACCOUNT = 
AtlasPathExtractorUtil.ATTRIBUTE_ACCOUNT;
 
     @Override
     public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord 
event) {
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index f6ccdd7..9a5982b 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -21,6 +21,7 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.security.SecurityProperties;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.alias.CredentialProvider;
@@ -325,6 +326,22 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
             .defaultValue(LINEAGE_STRATEGY_SIMPLE_PATH.getValue())
             .build();
 
+    static final AllowableValue AWS_S3_MODEL_VERSION_V1 = new 
AllowableValue("v1", "v1",
+            "Creates AWS S3 directory entities version 1 
(aws_s3_pseudo_dir).");
+    static final AllowableValue AWS_S3_MODEL_VERSION_V2 = new 
AllowableValue(AtlasPathExtractorUtil.AWS_S3_ATLAS_MODEL_VERSION_V2, "v2",
+            "Creates AWS S3 directory entities version 2 
(aws_s3_v2_directory).");
+
+    static final PropertyDescriptor AWS_S3_MODEL_VERSION = new 
PropertyDescriptor.Builder()
+            .name("aws-s3-model-version")
+            .displayName("AWS S3 Model Version")
+            .description("Specifies what type of AWS S3 directory entities 
will be created in Atlas for s3a:// transit URIs (eg. PutHDFS with S3 
integration)." +
+                    " NOTE: It is strongly recommended to keep using the same 
AWS S3 entity model version once this reporting task started to keep Atlas data 
clean." +
+                    " Switching versions will not delete existing Atlas 
entities created by the old version, nor migrate them to the new version.")
+            .required(true)
+            .allowableValues(AWS_S3_MODEL_VERSION_V1, AWS_S3_MODEL_VERSION_V2)
+            .defaultValue(AWS_S3_MODEL_VERSION_V2.getValue())
+            .build();
+
     private static final String ATLAS_PROPERTIES_FILENAME = 
"atlas-application.properties";
     private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = 
"atlas.client.connectTimeoutMSecs";
     private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = 
"atlas.client.readTimeoutMSecs";
@@ -387,6 +404,9 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
         properties.add(ATLAS_CONNECT_TIMEOUT);
         properties.add(ATLAS_READ_TIMEOUT);
 
+        // Provenance event analyzer specific properties
+        properties.add(AWS_S3_MODEL_VERSION);
+
         return properties;
     }
 
@@ -848,9 +868,10 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
 
     private void consumeNiFiProvenanceEvents(ReportingContext context, 
NiFiFlow nifiFlow) {
         final EventAccess eventAccess = context.getEventAccess();
+        final String awsS3ModelVersion = 
context.getProperty(AWS_S3_MODEL_VERSION).getValue();
         final AnalysisContext analysisContext = new 
StandardAnalysisContext(nifiFlow, namespaceResolvers,
                 // FIXME: This class cast shouldn't be necessary to query 
lineage. Possible refactor target in next major update.
-                (ProvenanceRepository)eventAccess.getProvenanceRepository());
+                (ProvenanceRepository)eventAccess.getProvenanceRepository(), 
awsS3ModelVersion);
         consumer.consumeEvents(context, (componentMapHolder, events) -> {
             for (ProvenanceEventRecord event : events) {
                 try {
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/AbstractTestAwsS3Directory.java
similarity index 58%
rename from 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java
rename to 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/AbstractTestAwsS3Directory.java
index d934328..5e0db83 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/AbstractTestAwsS3Directory.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
@@ -24,56 +23,25 @@ import 
org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
 import org.apache.nifi.atlas.resolver.NamespaceResolver;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
-import org.junit.Test;
 import org.mockito.Mockito;
 
-import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
-import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
-public class TestAwsS3Directory {
+public abstract class AbstractTestAwsS3Directory {
 
-    private static final ProvenanceEventType PROVENANCE_EVENT_TYPE = 
ProvenanceEventType.SEND;
-    private static final String ATLAS_NAMESPACE = "namespace1";
-    private static final String AWS_BUCKET = "bucket1";
-    private static final String AWS_FILENAME = "file1";
+    protected static final ProvenanceEventType PROVENANCE_EVENT_TYPE = 
ProvenanceEventType.SEND;
+    protected static final String ATLAS_NAMESPACE = "namespace1";
+    protected static final String AWS_BUCKET = "bucket1";
+    protected static final String AWS_FILENAME = "file1";
 
-    @Test
-    public void testSimpleDirectory() {
-        String processorName = "PutHDFS";
-        String directory = "/dir1";
+    protected abstract String getAwsS3ModelVersion();
 
-        executeTest(processorName, directory);
-    }
-
-    @Test
-    public void testCompoundDirectory() {
-        String processorName = "PutHDFS";
-        String directory = "/dir1/dir2/dir3/dir4/dir5";
-
-        executeTest(processorName, directory);
-    }
-
-    @Test
-    public void testRootDirectory() {
-        String processorName = "PutHDFS";
-        String directory = "/";
-
-        executeTest(processorName, directory);
-    }
-
-    @Test
-    public void testWithPutORC() {
-        String processorName = "PutORC";
-        String directory = "/dir1";
+    protected abstract void assertAnalysisResult(DataSetRefs refs, String 
directory);
 
-        executeTest(processorName, directory);
-    }
-
-    public void executeTest(String processorName, String directory) {
+    protected void executeTest(String processorName, String directory) {
         String transitUri = createTransitUri(directory);
 
         ProvenanceEventRecord provenanceEvent = 
mockProvenanceEvent(processorName, transitUri);
@@ -110,6 +78,7 @@ public class TestAwsS3Directory {
 
         AnalysisContext analysisContext = Mockito.mock(AnalysisContext.class);
         
when(analysisContext.getNamespaceResolver()).thenReturn(namespaceResolver);
+        
when(analysisContext.getAwsS3ModelVersion()).thenReturn(getAwsS3ModelVersion());
 
         return analysisContext;
     }
@@ -118,25 +87,4 @@ public class TestAwsS3Directory {
         assertNotNull(analyzer);
         assertEquals(AwsS3Directory.class, analyzer.getClass());
     }
-
-    private void assertAnalysisResult(DataSetRefs refs, String directory) {
-        String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", 
AWS_BUCKET, directory, ATLAS_NAMESPACE);
-        String expectedBucketQualifiedName = String.format("s3a://%s@%s", 
AWS_BUCKET, ATLAS_NAMESPACE);
-
-        assertEquals(0, refs.getInputs().size());
-        assertEquals(1, refs.getOutputs().size());
-
-        Referenceable directoryRef = refs.getOutputs().iterator().next();
-
-        assertEquals("aws_s3_pseudo_dir", directoryRef.getTypeName());
-        assertEquals(expectedDirectoryQualifiedName, 
directoryRef.get(ATTR_QUALIFIED_NAME));
-        assertEquals(directory, directoryRef.get(ATTR_NAME));
-        assertEquals(directory, directoryRef.get("objectPrefix"));
-
-        Referenceable bucketRef = (Referenceable) directoryRef.get("bucket");
-        assertNotNull(bucketRef);
-        assertEquals("aws_s3_bucket", bucketRef.getTypeName());
-        assertEquals(expectedBucketQualifiedName, 
bucketRef.get(ATTR_QUALIFIED_NAME));
-        assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME));
-    }
 }
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV1.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV1.java
new file mode 100644
index 0000000..69f5d88
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV1.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.junit.Test;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static 
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_BUCKET_V1;
+import static 
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_OBJECT_PREFIX_V1;
+import static 
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_BUCKET_V1;
+import static 
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_DIRECTORY_V1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestAwsS3DirectoryV1 extends AbstractTestAwsS3Directory {
+
+    @Override
+    protected String getAwsS3ModelVersion() {
+        return "v1";
+    }
+
+    @Test
+    public void testSimpleDirectory() {
+        String processorName = "PutHDFS";
+        String dirPath = "/dir1";
+
+        executeTest(processorName, dirPath);
+    }
+
+    @Test
+    public void testCompoundDirectory() {
+        String processorName = "PutHDFS";
+        String dirPath = "/dir1/dir2/dir3/dir4/dir5";
+
+        executeTest(processorName, dirPath);
+    }
+
+    @Test
+    public void testRootDirectory() {
+        String processorName = "PutHDFS";
+        String dirPath = "/";
+
+        executeTest(processorName, dirPath);
+    }
+
+    @Test
+    public void testWithPutORC() {
+        String processorName = "PutORC";
+        String dirPath = "/dir1";
+
+        executeTest(processorName, dirPath);
+    }
+
+    @Override
+    protected void assertAnalysisResult(DataSetRefs refs, String dirPath) {
+        String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", 
AWS_BUCKET, dirPath, ATLAS_NAMESPACE);
+        String expectedBucketQualifiedName = String.format("s3a://%s@%s", 
AWS_BUCKET, ATLAS_NAMESPACE);
+
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+
+        Referenceable directoryRef = refs.getOutputs().iterator().next();
+
+        assertEquals(TYPE_DIRECTORY_V1, directoryRef.getTypeName());
+        assertEquals(expectedDirectoryQualifiedName, 
directoryRef.get(ATTR_QUALIFIED_NAME));
+        assertEquals(dirPath, directoryRef.get(ATTR_NAME));
+        assertEquals(dirPath, directoryRef.get(ATTR_OBJECT_PREFIX_V1));
+
+        Referenceable bucketRef = (Referenceable) 
directoryRef.get(ATTR_BUCKET_V1);
+        assertNotNull(bucketRef);
+        assertEquals(TYPE_BUCKET_V1, bucketRef.getTypeName());
+        assertEquals(expectedBucketQualifiedName, 
bucketRef.get(ATTR_QUALIFIED_NAME));
+        assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java
new file mode 100644
index 0000000..0694b6a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.junit.Test;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static 
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_CONTAINER_V2;
+import static 
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.ATTR_OBJECT_PREFIX_V2;
+import static 
org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory.TYPE_BUCKET_V2;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestAwsS3DirectoryV2 extends AbstractTestAwsS3Directory {
+
+    @Override
+    protected String getAwsS3ModelVersion() {
+        return AtlasPathExtractorUtil.AWS_S3_ATLAS_MODEL_VERSION_V2;
+    }
+
+    @Test
+    public void testSimpleDirectory() {
+        String processorName = "PutHDFS";
+        String dirPath = "/dir1";
+
+        executeTest(processorName, dirPath);
+    }
+
+    @Test
+    public void testCompoundDirectory() {
+        String processorName = "PutHDFS";
+        String dirPath = "/dir1/dir2/dir3/dir4/dir5";
+
+        executeTest(processorName, dirPath);
+    }
+
+    @Test
+    public void testRootDirectory() {
+        String processorName = "PutHDFS";
+        String dirPath = "/";
+
+        executeTest(processorName, dirPath);
+    }
+
+    @Test
+    public void testWithPutORC() {
+        String processorName = "PutORC";
+        String dirPath = "/dir1";
+
+        executeTest(processorName, dirPath);
+    }
+
+    protected void assertAnalysisResult(DataSetRefs refs, String dirPath) {
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+
+        Referenceable ref = refs.getOutputs().iterator().next();
+
+        String actualPath = dirPath;
+        while (StringUtils.isNotEmpty(actualPath) && !"/".equals(actualPath)) {
+            String directory = StringUtils.substringAfterLast(actualPath, "/");
+
+            assertEquals(AwsS3Directory.TYPE_DIRECTORY_V2, ref.getTypeName());
+            assertEquals(String.format("s3a://%s%s/@%s", AWS_BUCKET, 
actualPath, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME));
+            assertEquals(directory, ref.get(ATTR_NAME));
+            assertEquals(actualPath + "/", ref.get(ATTR_OBJECT_PREFIX_V2));
+            assertNotNull(ref.get(ATTR_CONTAINER_V2));
+
+            ref = (Referenceable) ref.get(ATTR_CONTAINER_V2);
+            actualPath = StringUtils.substringBeforeLast(actualPath, "/");
+        }
+
+        assertEquals(TYPE_BUCKET_V2, ref.getTypeName());
+        assertEquals(String.format("s3a://%s@%s", AWS_BUCKET, 
ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals(AWS_BUCKET, ref.get(ATTR_NAME));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java
index 2f7cdd5..583ad4e 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAzureADLSDirectory.java
@@ -80,7 +80,7 @@ public class TestAzureADLSDirectory {
         executeTest(processorName, path);
     }
 
-    public void executeTest(String processorName, String path) {
+    private void executeTest(String processorName, String path) {
         String transitUri = 
String.format("abfs://%s@%s.dfs.core.windows.net%s/%s", ADLS_FILESYSTEM, 
ADLS_ACCOUNT, path, ADLS_FILENAME);
 
         ProvenanceEventRecord provenanceEvent = 
mockProvenanceEvent(processorName, transitUri);

Reply via email to