tpalfy commented on a change in pull request #5413:
URL: https://github.com/apache/nifi/pull/5413#discussion_r737429266



##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         }
     }
 
+    private void listByTrackingEntities(ProcessContext context, ProcessSession 
session) {
+        listedEntityTracker.trackEntities(context, session, 
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            S3BucketLister bucketLister = getS3BucketLister(context, 
getClient(), bucket);
+
+            List<ListableEntityWrapper<S3VersionSummary>> listedEntities = 
bucketLister.listVersions().getVersionSummaries()
+                .stream()
+                .filter(s3VersionSummary -> 
s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+                .map(s3VersionSummary -> new 
ListableEntityWrapper<S3VersionSummary>(
+                    s3VersionSummary,
+                    S3VersionSummary::getKey,
+                    summary -> summary.getKey() + "_" + summary.getVersionId(),
+                    summary -> summary.getLastModified().getTime(),
+                    S3VersionSummary::getSize
+                ))
+                .collect(Collectors.toList());
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    private class ListedS3VersionSummaryTracker extends 
ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+        public ListedS3VersionSummaryTracker() {
+            super(getIdentifier(), getLogger(), 
RecordObjectWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+        ) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+            Function<ListableEntityWrapper<S3VersionSummary>, Map<String, 
String>> createAttributes
+        ) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession 
session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+            final S3ObjectWriter writer;
+            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeObjectWriter(session);
+            } else {
+                writer = new RecordObjectWriter(session, writerFactory, 
getLogger());
+            }
+
+            try {
+                writer.beginListing();
+                final int batchSize = 
context.getProperty(BATCH_SIZE).asInteger();
+
+                int listCount = 0;
+                for (ListableEntityWrapper<S3VersionSummary> updatedEntity : 
updatedEntities) {
+                    S3VersionSummary s3VersionSummary = 
updatedEntity.getRawEntity();
+
+                    GetObjectTaggingResult taggingResult = 
getTaggingResult(context, getClient(), s3VersionSummary);
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, 
getClient(), s3VersionSummary);
+
+                    writer.addToListing(s3VersionSummary, taggingResult, 
objectMetadata);
+
+                    listCount++;
+
+                    if (listCount >= batchSize && writer.isCheckpoint()) {
+                        getLogger().info("Successfully listed {} new files 
from S3; routing to success", new Object[]{listCount});
+                        session.commitAsync();
+                    }
+
+                    final ListedEntity listedEntity = new 
ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+                    alreadyListedEntities.put(updatedEntity.getIdentifier(), 
listedEntity);
+                }
+
+                writer.finishListing();
+            } catch (final Exception e) {
+                getLogger().error("Failed to list contents of bucket due to 
{}", new Object[]{e}, e);
+                writer.finishListingExceptionally(e);
+                session.rollback();
+                context.yield();
+                return;
+            }
+        }
+    }
+
+    private GetObjectTaggingResult getTaggingResult(ProcessContext context, 
AmazonS3 client, S3VersionSummary versionSummary) {
+        GetObjectTaggingResult taggingResult = null;
+        if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
+            try {
+                taggingResult = client.getObjectTagging(new 
GetObjectTaggingRequest(versionSummary.getBucketName(), 
versionSummary.getKey()));
+            } catch (final Exception e) {
+                getLogger().warn("Failed to obtain Object Tags for S3 Object 
{} in bucket {}. Will list S3 Object without the object tags",
+                    new Object[] {versionSummary.getKey(), 
versionSummary.getBucketName()}, e);
+            }
+        }
+        return taggingResult;
+    }
+
+    private ObjectMetadata getObjectMetadata(ProcessContext context, AmazonS3 
client, S3VersionSummary versionSummary) {
+        ObjectMetadata objectMetadata = null;
+        if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+            try {
+                objectMetadata = client.getObjectMetadata(new 
GetObjectMetadataRequest(versionSummary.getBucketName(), 
versionSummary.getKey()));
+            } catch (final Exception e) {
+                getLogger().warn("Failed to obtain User Metadata for S3 Object 
{} in bucket {}. Will list S3 Object without the user metadata",
+                    new Object[] {versionSummary.getKey(), 
versionSummary.getBucketName()}, e);
+            }
+        }
+        return objectMetadata;
+    }
+
+    private S3BucketLister getS3BucketLister(ProcessContext context, AmazonS3 
client, String bucket) {
+        String delimiter = context.getProperty(DELIMITER).getValue();
+        boolean requesterPays = 
context.getProperty(REQUESTER_PAYS).asBoolean();
+        String prefix = 
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+
+        boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
+        int listType = context.getProperty(LIST_TYPE).asInteger();
+        S3BucketLister bucketLister = useVersions
+            ? new S3VersionBucketLister(client)
+            : listType == 2
+            ? new S3ObjectBucketListerVersion2(client)
+            : new S3ObjectBucketLister(client);

Review comment:
       This was a refactor, moved from one place to another. Would rather not 
touch it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to