exceptionfactory commented on code in PR #8088:
URL: https://github.com/apache/nifi/pull/8088#discussion_r1435309260
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (NO_TRACKING.equals(listingStrategy)) {
+ listNoTracking(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ private void listNoTracking(ProcessContext context, ProcessSession
session) {
+ final AmazonS3 client = getClient(context);
+
+ S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+ final long startNanos = System.nanoTime();
+ final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+ final long listingTimestamp = System.currentTimeMillis();
+
+ final String bucket =
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ int totalListCount = 0;
+
+ getLogger().trace("Start listing, listingTimestamp={}", new
Object[]{listingTimestamp});
+
+ final S3ObjectWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
+ }
+
+ try {
+ writer.beginListing();
+
+ do {
+ VersionListing versionListing = bucketLister.listVersions();
+ for (S3VersionSummary versionSummary :
versionListing.getVersionSummaries()) {
+ long lastModified =
versionSummary.getLastModified().getTime();
+ if ((maxAgeMilliseconds != null && (lastModified <
(listingTimestamp - maxAgeMilliseconds)))
+ || lastModified > (listingTimestamp -
minAgeMilliseconds)) {
+ continue;
+ }
+
+ getLogger().trace("Listed key={}, lastModified={}", new
Object[]{versionSummary.getKey(), lastModified});
Review Comment:
```suggestion
getLogger().trace("Listed key={}, lastModified={}",
versionSummary.getKey(), lastModified);
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (NO_TRACKING.equals(listingStrategy)) {
+ listNoTracking(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ private void listNoTracking(ProcessContext context, ProcessSession
session) {
+ final AmazonS3 client = getClient(context);
+
+ S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+ final long startNanos = System.nanoTime();
+ final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+ final long listingTimestamp = System.currentTimeMillis();
+
+ final String bucket =
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ int totalListCount = 0;
+
+ getLogger().trace("Start listing, listingTimestamp={}", new
Object[]{listingTimestamp});
Review Comment:
The `Object[]` wrapper is not necessary for log arguments, although not all
components have been updated to reflect the recommended usage, new code should
follow the pattern.
```suggestion
getLogger().trace("Start listing, listingTimestamp={}",
listingTimestamp);
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (NO_TRACKING.equals(listingStrategy)) {
+ listNoTracking(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ private void listNoTracking(ProcessContext context, ProcessSession
session) {
+ final AmazonS3 client = getClient(context);
+
+ S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+ final long startNanos = System.nanoTime();
+ final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+ final long listingTimestamp = System.currentTimeMillis();
+
+ final String bucket =
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ int totalListCount = 0;
+
+ getLogger().trace("Start listing, listingTimestamp={}", new
Object[]{listingTimestamp});
+
+ final S3ObjectWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
+ }
+
+ try {
+ writer.beginListing();
+
+ do {
+ VersionListing versionListing = bucketLister.listVersions();
+ for (S3VersionSummary versionSummary :
versionListing.getVersionSummaries()) {
+ long lastModified =
versionSummary.getLastModified().getTime();
+ if ((maxAgeMilliseconds != null && (lastModified <
(listingTimestamp - maxAgeMilliseconds)))
+ || lastModified > (listingTimestamp -
minAgeMilliseconds)) {
+ continue;
+ }
+
+ getLogger().trace("Listed key={}, lastModified={}", new
Object[]{versionSummary.getKey(), lastModified});
+
+ GetObjectTaggingResult taggingResult =
getTaggingResult(context, client, versionSummary);
+
+ ObjectMetadata objectMetadata = getObjectMetadata(context,
client, versionSummary);
+
+ // Write the entity to the listing
+ writer.addToListing(versionSummary, taggingResult,
objectMetadata, context.getProperty(S3_REGION).getValue());
+
+ listCount++;
+ }
+ bucketLister.setNextMarker();
+
+ totalListCount += listCount;
+
+ if (listCount >= batchSize && writer.isCheckpoint()) {
+ getLogger().info("Successfully listed {} new files from
S3; routing to success", listCount);
+ session.commitAsync();
+ }
+
+ listCount = 0;
+ } while (bucketLister.isTruncated());
+
+ writer.finishListing();
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of bucket due to {}",
e, e);
+ writer.finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
+ }
+
+ final long listMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ getLogger().info("Successfully listed S3 bucket {} in {} millis", new
Object[]{bucket, listMillis});
Review Comment:
```suggestion
getLogger().info("Successfully listed S3 bucket {} in {} millis",
bucket, listMillis);
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (NO_TRACKING.equals(listingStrategy)) {
+ listNoTracking(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ private void listNoTracking(ProcessContext context, ProcessSession
session) {
+ final AmazonS3 client = getClient(context);
+
+ S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+ final long startNanos = System.nanoTime();
+ final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+ final long listingTimestamp = System.currentTimeMillis();
+
+ final String bucket =
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ int totalListCount = 0;
+
+ getLogger().trace("Start listing, listingTimestamp={}", new
Object[]{listingTimestamp});
+
+ final S3ObjectWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
+ }
+
+ try {
+ writer.beginListing();
+
+ do {
+ VersionListing versionListing = bucketLister.listVersions();
+ for (S3VersionSummary versionSummary :
versionListing.getVersionSummaries()) {
+ long lastModified =
versionSummary.getLastModified().getTime();
+ if ((maxAgeMilliseconds != null && (lastModified <
(listingTimestamp - maxAgeMilliseconds)))
+ || lastModified > (listingTimestamp -
minAgeMilliseconds)) {
+ continue;
+ }
+
+ getLogger().trace("Listed key={}, lastModified={}", new
Object[]{versionSummary.getKey(), lastModified});
+
+ GetObjectTaggingResult taggingResult =
getTaggingResult(context, client, versionSummary);
+
+ ObjectMetadata objectMetadata = getObjectMetadata(context,
client, versionSummary);
+
+ // Write the entity to the listing
+ writer.addToListing(versionSummary, taggingResult,
objectMetadata, context.getProperty(S3_REGION).getValue());
+
+ listCount++;
+ }
+ bucketLister.setNextMarker();
+
+ totalListCount += listCount;
+
+ if (listCount >= batchSize && writer.isCheckpoint()) {
+ getLogger().info("Successfully listed {} new files from
S3; routing to success", listCount);
+ session.commitAsync();
+ }
+
+ listCount = 0;
+ } while (bucketLister.isTruncated());
+
+ writer.finishListing();
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of bucket due to {}",
e, e);
+ writer.finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
+ }
+
+ final long listMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ getLogger().info("Successfully listed S3 bucket {} in {} millis", new
Object[]{bucket, listMillis});
+
+ if (totalListCount == 0) {
+ getLogger().debug("No new objects in S3 bucket {} to list.
Yielding.", new Object[]{bucket});
Review Comment:
```suggestion
getLogger().debug("No new objects in S3 bucket {} to list.
Yielding", bucket);
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java:
##########
@@ -451,11 +456,92 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (NO_TRACKING.equals(listingStrategy)) {
+ listNoTracking(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ private void listNoTracking(ProcessContext context, ProcessSession
session) {
+ final AmazonS3 client = getClient(context);
+
+ S3BucketLister bucketLister = getS3BucketLister(context, client);
+
+ final long startNanos = System.nanoTime();
+ final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
+ final long listingTimestamp = System.currentTimeMillis();
+
+ final String bucket =
context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ int totalListCount = 0;
+
+ getLogger().trace("Start listing, listingTimestamp={}", new
Object[]{listingTimestamp});
+
+ final S3ObjectWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
+ }
+
+ try {
+ writer.beginListing();
+
+ do {
+ VersionListing versionListing = bucketLister.listVersions();
+ for (S3VersionSummary versionSummary :
versionListing.getVersionSummaries()) {
+ long lastModified =
versionSummary.getLastModified().getTime();
+ if ((maxAgeMilliseconds != null && (lastModified <
(listingTimestamp - maxAgeMilliseconds)))
+ || lastModified > (listingTimestamp -
minAgeMilliseconds)) {
+ continue;
+ }
+
+ getLogger().trace("Listed key={}, lastModified={}", new
Object[]{versionSummary.getKey(), lastModified});
+
+ GetObjectTaggingResult taggingResult =
getTaggingResult(context, client, versionSummary);
+
+ ObjectMetadata objectMetadata = getObjectMetadata(context,
client, versionSummary);
+
+ // Write the entity to the listing
+ writer.addToListing(versionSummary, taggingResult,
objectMetadata, context.getProperty(S3_REGION).getValue());
+
+ listCount++;
+ }
+ bucketLister.setNextMarker();
+
+ totalListCount += listCount;
+
+ if (listCount >= batchSize && writer.isCheckpoint()) {
+ getLogger().info("Successfully listed {} new files from
S3; routing to success", listCount);
+ session.commitAsync();
+ }
+
+ listCount = 0;
+ } while (bucketLister.isTruncated());
+
+ writer.finishListing();
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of bucket due to {}",
e, e);
Review Comment:
The `due to {}` approach is duplicative and also needs to be removed from
older implementations, but this is an opportunity to streamline the log and
avoid duplicating the message, as it is already included in the stack trace.
```suggestion
getLogger().error("Failed to list contents of bucket", e);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]