exceptionfactory commented on code in PR #8088:
URL: https://github.com/apache/nifi/pull/8088#discussion_r1435309260


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             listByTrackingTimestamps(context, session);
         } else if (BY_ENTITIES.equals(listingStrategy)) {
             listByTrackingEntities(context, session);
+        } else if (NO_TRACKING.equals(listingStrategy)) {
+            listNoTracking(context, session);
         } else {
             throw new ProcessException("Unknown listing strategy: " + 
listingStrategy);
         }
     }
 
+    private void listNoTracking(ProcessContext context, ProcessSession 
session) {
+        final AmazonS3 client = getClient(context);
+
+        S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+        final long startNanos = System.nanoTime();
+        final long minAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+        final long listingTimestamp = System.currentTimeMillis();
+
+        final String bucket = 
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        int listCount = 0;
+        int totalListCount = 0;
+
+        getLogger().trace("Start listing, listingTimestamp={}", new 
Object[]{listingTimestamp});
+
+        final S3ObjectWriter writer;
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        if (writerFactory == null) {
+            writer = new AttributeObjectWriter(session);
+        } else {
+            writer = new RecordObjectWriter(session, writerFactory, 
getLogger(), context.getProperty(S3_REGION).getValue());
+        }
+
+        try {
+            writer.beginListing();
+
+            do {
+                VersionListing versionListing = bucketLister.listVersions();
+                for (S3VersionSummary versionSummary : 
versionListing.getVersionSummaries()) {
+                    long lastModified = 
versionSummary.getLastModified().getTime();
+                    if ((maxAgeMilliseconds != null && (lastModified < 
(listingTimestamp - maxAgeMilliseconds)))
+                        || lastModified > (listingTimestamp - 
minAgeMilliseconds)) {
+                        continue;
+                    }
+
+                    getLogger().trace("Listed key={}, lastModified={}", new 
Object[]{versionSummary.getKey(), lastModified});

Review Comment:
   ```suggestion
                       getLogger().trace("Listed key={}, lastModified={}", 
versionSummary.getKey(), lastModified);
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             listByTrackingTimestamps(context, session);
         } else if (BY_ENTITIES.equals(listingStrategy)) {
             listByTrackingEntities(context, session);
+        } else if (NO_TRACKING.equals(listingStrategy)) {
+            listNoTracking(context, session);
         } else {
             throw new ProcessException("Unknown listing strategy: " + 
listingStrategy);
         }
     }
 
+    private void listNoTracking(ProcessContext context, ProcessSession 
session) {
+        final AmazonS3 client = getClient(context);
+
+        S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+        final long startNanos = System.nanoTime();
+        final long minAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+        final long listingTimestamp = System.currentTimeMillis();
+
+        final String bucket = 
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        int listCount = 0;
+        int totalListCount = 0;
+
+        getLogger().trace("Start listing, listingTimestamp={}", new 
Object[]{listingTimestamp});

Review Comment:
   The `Object[]` wrapper is not necessary for log arguments, although not all 
components have been updated to reflect the recommended usage, new code should 
follow the pattern.
   ```suggestion
           getLogger().trace("Start listing, listingTimestamp={}", 
listingTimestamp);
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             listByTrackingTimestamps(context, session);
         } else if (BY_ENTITIES.equals(listingStrategy)) {
             listByTrackingEntities(context, session);
+        } else if (NO_TRACKING.equals(listingStrategy)) {
+            listNoTracking(context, session);
         } else {
             throw new ProcessException("Unknown listing strategy: " + 
listingStrategy);
         }
     }
 
+    private void listNoTracking(ProcessContext context, ProcessSession 
session) {
+        final AmazonS3 client = getClient(context);
+
+        S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+        final long startNanos = System.nanoTime();
+        final long minAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+        final long listingTimestamp = System.currentTimeMillis();
+
+        final String bucket = 
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        int listCount = 0;
+        int totalListCount = 0;
+
+        getLogger().trace("Start listing, listingTimestamp={}", new 
Object[]{listingTimestamp});
+
+        final S3ObjectWriter writer;
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        if (writerFactory == null) {
+            writer = new AttributeObjectWriter(session);
+        } else {
+            writer = new RecordObjectWriter(session, writerFactory, 
getLogger(), context.getProperty(S3_REGION).getValue());
+        }
+
+        try {
+            writer.beginListing();
+
+            do {
+                VersionListing versionListing = bucketLister.listVersions();
+                for (S3VersionSummary versionSummary : 
versionListing.getVersionSummaries()) {
+                    long lastModified = 
versionSummary.getLastModified().getTime();
+                    if ((maxAgeMilliseconds != null && (lastModified < 
(listingTimestamp - maxAgeMilliseconds)))
+                        || lastModified > (listingTimestamp - 
minAgeMilliseconds)) {
+                        continue;
+                    }
+
+                    getLogger().trace("Listed key={}, lastModified={}", new 
Object[]{versionSummary.getKey(), lastModified});
+
+                    GetObjectTaggingResult taggingResult = 
getTaggingResult(context, client, versionSummary);
+
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, 
client, versionSummary);
+
+                    // Write the entity to the listing
+                    writer.addToListing(versionSummary, taggingResult, 
objectMetadata, context.getProperty(S3_REGION).getValue());
+
+                    listCount++;
+                }
+                bucketLister.setNextMarker();
+
+                totalListCount += listCount;
+
+                if (listCount >= batchSize && writer.isCheckpoint()) {
+                    getLogger().info("Successfully listed {} new files from 
S3; routing to success", listCount);
+                    session.commitAsync();
+                }
+
+                listCount = 0;
+            } while (bucketLister.isTruncated());
+
+            writer.finishListing();
+        } catch (final Exception e) {
+            getLogger().error("Failed to list contents of bucket due to {}", 
e, e);
+            writer.finishListingExceptionally(e);
+            session.rollback();
+            context.yield();
+            return;
+        }
+
+        final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully listed S3 bucket {} in {} millis", new 
Object[]{bucket, listMillis});

Review Comment:
   ```suggestion
           getLogger().info("Successfully listed S3 bucket {} in {} millis", 
bucket, listMillis);
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             listByTrackingTimestamps(context, session);
         } else if (BY_ENTITIES.equals(listingStrategy)) {
             listByTrackingEntities(context, session);
+        } else if (NO_TRACKING.equals(listingStrategy)) {
+            listNoTracking(context, session);
         } else {
             throw new ProcessException("Unknown listing strategy: " + 
listingStrategy);
         }
     }
 
+    private void listNoTracking(ProcessContext context, ProcessSession 
session) {
+        final AmazonS3 client = getClient(context);
+
+        S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+        final long startNanos = System.nanoTime();
+        final long minAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+        final long listingTimestamp = System.currentTimeMillis();
+
+        final String bucket = 
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        int listCount = 0;
+        int totalListCount = 0;
+
+        getLogger().trace("Start listing, listingTimestamp={}", new 
Object[]{listingTimestamp});
+
+        final S3ObjectWriter writer;
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        if (writerFactory == null) {
+            writer = new AttributeObjectWriter(session);
+        } else {
+            writer = new RecordObjectWriter(session, writerFactory, 
getLogger(), context.getProperty(S3_REGION).getValue());
+        }
+
+        try {
+            writer.beginListing();
+
+            do {
+                VersionListing versionListing = bucketLister.listVersions();
+                for (S3VersionSummary versionSummary : 
versionListing.getVersionSummaries()) {
+                    long lastModified = 
versionSummary.getLastModified().getTime();
+                    if ((maxAgeMilliseconds != null && (lastModified < 
(listingTimestamp - maxAgeMilliseconds)))
+                        || lastModified > (listingTimestamp - 
minAgeMilliseconds)) {
+                        continue;
+                    }
+
+                    getLogger().trace("Listed key={}, lastModified={}", new 
Object[]{versionSummary.getKey(), lastModified});
+
+                    GetObjectTaggingResult taggingResult = 
getTaggingResult(context, client, versionSummary);
+
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, 
client, versionSummary);
+
+                    // Write the entity to the listing
+                    writer.addToListing(versionSummary, taggingResult, 
objectMetadata, context.getProperty(S3_REGION).getValue());
+
+                    listCount++;
+                }
+                bucketLister.setNextMarker();
+
+                totalListCount += listCount;
+
+                if (listCount >= batchSize && writer.isCheckpoint()) {
+                    getLogger().info("Successfully listed {} new files from 
S3; routing to success", listCount);
+                    session.commitAsync();
+                }
+
+                listCount = 0;
+            } while (bucketLister.isTruncated());
+
+            writer.finishListing();
+        } catch (final Exception e) {
+            getLogger().error("Failed to list contents of bucket due to {}", 
e, e);
+            writer.finishListingExceptionally(e);
+            session.rollback();
+            context.yield();
+            return;
+        }
+
+        final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully listed S3 bucket {} in {} millis", new 
Object[]{bucket, listMillis});
+
+        if (totalListCount == 0) {
+            getLogger().debug("No new objects in S3 bucket {} to list. 
Yielding.", new Object[]{bucket});

Review Comment:
   ```suggestion
               getLogger().debug("No new objects in S3 bucket {} to list. 
Yielding", bucket);
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             listByTrackingTimestamps(context, session);
         } else if (BY_ENTITIES.equals(listingStrategy)) {
             listByTrackingEntities(context, session);
+        } else if (NO_TRACKING.equals(listingStrategy)) {
+            listNoTracking(context, session);
         } else {
             throw new ProcessException("Unknown listing strategy: " + 
listingStrategy);
         }
     }
 
+    private void listNoTracking(ProcessContext context, ProcessSession 
session) {
+        final AmazonS3 client = getClient(context);
+
+        S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+        final long startNanos = System.nanoTime();
+        final long minAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+        final long listingTimestamp = System.currentTimeMillis();
+
+        final String bucket = 
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        int listCount = 0;
+        int totalListCount = 0;
+
+        getLogger().trace("Start listing, listingTimestamp={}", new 
Object[]{listingTimestamp});
+
+        final S3ObjectWriter writer;
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        if (writerFactory == null) {
+            writer = new AttributeObjectWriter(session);
+        } else {
+            writer = new RecordObjectWriter(session, writerFactory, 
getLogger(), context.getProperty(S3_REGION).getValue());
+        }
+
+        try {
+            writer.beginListing();
+
+            do {
+                VersionListing versionListing = bucketLister.listVersions();
+                for (S3VersionSummary versionSummary : 
versionListing.getVersionSummaries()) {
+                    long lastModified = 
versionSummary.getLastModified().getTime();
+                    if ((maxAgeMilliseconds != null && (lastModified < 
(listingTimestamp - maxAgeMilliseconds)))
+                        || lastModified > (listingTimestamp - 
minAgeMilliseconds)) {
+                        continue;
+                    }
+
+                    getLogger().trace("Listed key={}, lastModified={}", new 
Object[]{versionSummary.getKey(), lastModified});
+
+                    GetObjectTaggingResult taggingResult = 
getTaggingResult(context, client, versionSummary);
+
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, 
client, versionSummary);
+
+                    // Write the entity to the listing
+                    writer.addToListing(versionSummary, taggingResult, 
objectMetadata, context.getProperty(S3_REGION).getValue());
+
+                    listCount++;
+                }
+                bucketLister.setNextMarker();
+
+                totalListCount += listCount;
+
+                if (listCount >= batchSize && writer.isCheckpoint()) {
+                    getLogger().info("Successfully listed {} new files from 
S3; routing to success", listCount);
+                    session.commitAsync();
+                }
+
+                listCount = 0;
+            } while (bucketLister.isTruncated());
+
+            writer.finishListing();
+        } catch (final Exception e) {
+            getLogger().error("Failed to list contents of bucket due to {}", 
e, e);

Review Comment:
   The `due to {}` approach is duplicative and also needs to be removed from 
older implementations, but this is an opportunity to streamline the log and 
avoid duplicating the message, as it is already included in the stack trace.
   ```suggestion
               getLogger().error("Failed to list contents of bucket", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to