Lehel44 commented on a change in pull request #5413:
URL: https://github.com/apache/nifi/pull/5413#discussion_r733766851
##########
File path:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -157,6 +164,45 @@
@WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
})
public class ListGCSBucket extends AbstractGCSProcessor {
+ public static final AllowableValue BY_TIMESTAMPS = new
AllowableValue("timestamps", "Tracking Timestamps",
+ "This strategy tracks the latest timestamp of listed entity to
determine new/updated entities." +
+ " Since it only tracks few timestamps, it can manage listing state
efficiently." +
+ " However, any newly added, or updated entity having timestamp
older than the tracked latest timestamp can not be picked by this strategy." +
+ " Also may miss files when multiple subdirectories are being
written at the same time while listing is running.");
Review comment:
```suggestion
" Also, files may be missed/skipped when multiple subdirectories
are being written at the same time while listing is running.");
```
##########
File path:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -157,6 +164,45 @@
@WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
})
public class ListGCSBucket extends AbstractGCSProcessor {
+ public static final AllowableValue BY_TIMESTAMPS = new
AllowableValue("timestamps", "Tracking Timestamps",
+ "This strategy tracks the latest timestamp of listed entity to
determine new/updated entities." +
+ " Since it only tracks few timestamps, it can manage listing state
efficiently." +
+ " However, any newly added, or updated entity having timestamp
older than the tracked latest timestamp can not be picked by this strategy." +
Review comment:
```suggestion
" This strategy will not pick any newly added or modified entity
with a timestamp older than the recorded latest timestamp." +
```
##########
File path:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
}
+ private void listByTrackingEntities(ProcessContext context, ProcessSession
session) {
+ listedEntityTracker.trackEntities(context, session,
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ S3BucketLister bucketLister = getS3BucketLister(context,
getClient(), bucket);
+
+ List<ListableEntityWrapper<S3VersionSummary>> listedEntities =
bucketLister.listVersions().getVersionSummaries()
+ .stream()
+ .filter(s3VersionSummary ->
s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+ .map(s3VersionSummary -> new
ListableEntityWrapper<S3VersionSummary>(
+ s3VersionSummary,
+ S3VersionSummary::getKey,
+ summary -> summary.getKey() + "_" + summary.getVersionId(),
+ summary -> summary.getLastModified().getTime(),
+ S3VersionSummary::getSize
+ ))
+ .collect(Collectors.toList());
+
+ return listedEntities;
+ }, null);
+
+ justElectedPrimaryNode = false;
+ }
+
+ private class ListedS3VersionSummaryTracker extends
ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+ public ListedS3VersionSummaryTracker() {
+ super(getIdentifier(), getLogger(),
RecordObjectWriter.RECORD_SCHEMA);
+ }
+
+ @Override
+ protected void createRecordsForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+ ) throws IOException, SchemaNotFoundException {
+ publishListing(context, session, updatedEntities);
+ }
+
+ @Override
+ protected void createFlowFilesForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+ Function<ListableEntityWrapper<S3VersionSummary>, Map<String,
String>> createAttributes
+ ) {
+ publishListing(context, session, updatedEntities);
+ }
+
+ private void publishListing(ProcessContext context, ProcessSession
session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+ final S3ObjectWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger());
+ }
+
+ try {
+ writer.beginListing();
+ final int batchSize =
context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ for (ListableEntityWrapper<S3VersionSummary> updatedEntity :
updatedEntities) {
+ S3VersionSummary s3VersionSummary =
updatedEntity.getRawEntity();
+
+ GetObjectTaggingResult taggingResult =
getTaggingResult(context, getClient(), s3VersionSummary);
+ ObjectMetadata objectMetadata = getObjectMetadata(context,
getClient(), s3VersionSummary);
+
+ writer.addToListing(s3VersionSummary, taggingResult,
objectMetadata);
+
+ listCount++;
+
+ if (listCount >= batchSize && writer.isCheckpoint()) {
+ getLogger().info("Successfully listed {} new files
from S3; routing to success", new Object[]{listCount});
+ session.commitAsync();
+ }
+
+ final ListedEntity listedEntity = new
ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+ alreadyListedEntities.put(updatedEntity.getIdentifier(),
listedEntity);
+ }
+
+ writer.finishListing();
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of bucket due to
{}", new Object[]{e}, e);
+ writer.finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
+ }
+ }
+ }
+
+ private GetObjectTaggingResult getTaggingResult(ProcessContext context,
AmazonS3 client, S3VersionSummary versionSummary) {
+ GetObjectTaggingResult taggingResult = null;
+ if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
+ try {
+ taggingResult = client.getObjectTagging(new
GetObjectTaggingRequest(versionSummary.getBucketName(),
versionSummary.getKey()));
+ } catch (final Exception e) {
+ getLogger().warn("Failed to obtain Object Tags for S3 Object
{} in bucket {}. Will list S3 Object without the object tags",
+ new Object[] {versionSummary.getKey(),
versionSummary.getBucketName()}, e);
+ }
+ }
+ return taggingResult;
+ }
+
+ private ObjectMetadata getObjectMetadata(ProcessContext context, AmazonS3
client, S3VersionSummary versionSummary) {
+ ObjectMetadata objectMetadata = null;
+ if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+ try {
+ objectMetadata = client.getObjectMetadata(new
GetObjectMetadataRequest(versionSummary.getBucketName(),
versionSummary.getKey()));
+ } catch (final Exception e) {
+ getLogger().warn("Failed to obtain User Metadata for S3 Object
{} in bucket {}. Will list S3 Object without the user metadata",
+ new Object[] {versionSummary.getKey(),
versionSummary.getBucketName()}, e);
+ }
+ }
+ return objectMetadata;
+ }
+
+ private S3BucketLister getS3BucketLister(ProcessContext context, AmazonS3
client, String bucket) {
+ String delimiter = context.getProperty(DELIMITER).getValue();
+ boolean requesterPays =
context.getProperty(REQUESTER_PAYS).asBoolean();
+ String prefix =
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+
+ boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
+ int listType = context.getProperty(LIST_TYPE).asInteger();
+ S3BucketLister bucketLister = useVersions
+ ? new S3VersionBucketLister(client)
+ : listType == 2
+ ? new S3ObjectBucketListerVersion2(client)
+ : new S3ObjectBucketLister(client);
Review comment:
Would you please make this more readable? You could extract the nested
ternary operator to an if statement + a ternary operator.
##########
File path:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
}
+ private void listByTrackingEntities(ProcessContext context, ProcessSession
session) {
+ listedEntityTracker.trackEntities(context, session,
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ S3BucketLister bucketLister = getS3BucketLister(context,
getClient(), bucket);
+
+ List<ListableEntityWrapper<S3VersionSummary>> listedEntities =
bucketLister.listVersions().getVersionSummaries()
+ .stream()
+ .filter(s3VersionSummary ->
s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+ .map(s3VersionSummary -> new
ListableEntityWrapper<S3VersionSummary>(
+ s3VersionSummary,
+ S3VersionSummary::getKey,
+ summary -> summary.getKey() + "_" + summary.getVersionId(),
+ summary -> summary.getLastModified().getTime(),
+ S3VersionSummary::getSize
+ ))
+ .collect(Collectors.toList());
+
+ return listedEntities;
+ }, null);
+
+ justElectedPrimaryNode = false;
+ }
+
+ private class ListedS3VersionSummaryTracker extends
ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+ public ListedS3VersionSummaryTracker() {
+ super(getIdentifier(), getLogger(),
RecordObjectWriter.RECORD_SCHEMA);
+ }
+
+ @Override
+ protected void createRecordsForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+ ) throws IOException, SchemaNotFoundException {
+ publishListing(context, session, updatedEntities);
+ }
+
+ @Override
+ protected void createFlowFilesForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+ Function<ListableEntityWrapper<S3VersionSummary>, Map<String,
String>> createAttributes
+ ) {
+ publishListing(context, session, updatedEntities);
+ }
+
+ private void publishListing(ProcessContext context, ProcessSession
session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+ final S3ObjectWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger());
+ }
+
+ try {
+ writer.beginListing();
+ final int batchSize =
context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ for (ListableEntityWrapper<S3VersionSummary> updatedEntity :
updatedEntities) {
+ S3VersionSummary s3VersionSummary =
updatedEntity.getRawEntity();
+
+ GetObjectTaggingResult taggingResult =
getTaggingResult(context, getClient(), s3VersionSummary);
+ ObjectMetadata objectMetadata = getObjectMetadata(context,
getClient(), s3VersionSummary);
+
+ writer.addToListing(s3VersionSummary, taggingResult,
objectMetadata);
+
+ listCount++;
+
+ if (listCount >= batchSize && writer.isCheckpoint()) {
+ getLogger().info("Successfully listed {} new files
from S3; routing to success", new Object[]{listCount});
+ session.commitAsync();
+ }
+
+ final ListedEntity listedEntity = new
ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+ alreadyListedEntities.put(updatedEntity.getIdentifier(),
listedEntity);
+ }
+
+ writer.finishListing();
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of bucket due to
{}", new Object[]{e}, e);
+ writer.finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
Review comment:
Is there any reason to return here?
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
##########
@@ -63,7 +63,7 @@
public class ListedEntityTracker<T extends ListableEntity> {
private final ObjectMapper objectMapper = new ObjectMapper();
- private volatile Map<String, ListedEntity> alreadyListedEntities;
+ protected volatile Map<String, ListedEntity> alreadyListedEntities;
Review comment:
The implementation is always a concurrent hashmap which is thread-safe.
Is it need to be volatile?
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
##########
@@ -376,7 +376,7 @@ private void createRecordsForEntities(final ProcessContext
context, final Proces
session.transfer(flowFile, REL_SUCCESS);
}
- private void createFlowFilesForEntities(final ProcessSession session,
final List<T> updatedEntities, final Function<T, Map<String, String>>
createAttributes) {
+ protected void createFlowFilesForEntities(ProcessContext context, final
ProcessSession session, final List<T> updatedEntities, final Function<T,
Map<String, String>> createAttributes) {
Review comment:
The `context` is unused in this method, you may remove it.
##########
File path:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
}
+ private void listByTrackingEntities(ProcessContext context, ProcessSession
session) {
+ listedEntityTracker.trackEntities(context, session,
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ S3BucketLister bucketLister = getS3BucketLister(context,
getClient(), bucket);
+
+ List<ListableEntityWrapper<S3VersionSummary>> listedEntities =
bucketLister.listVersions().getVersionSummaries()
+ .stream()
+ .filter(s3VersionSummary ->
s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+ .map(s3VersionSummary -> new
ListableEntityWrapper<S3VersionSummary>(
+ s3VersionSummary,
+ S3VersionSummary::getKey,
+ summary -> summary.getKey() + "_" + summary.getVersionId(),
+ summary -> summary.getLastModified().getTime(),
+ S3VersionSummary::getSize
+ ))
+ .collect(Collectors.toList());
+
+ return listedEntities;
+ }, null);
+
+ justElectedPrimaryNode = false;
+ }
+
+ private class ListedS3VersionSummaryTracker extends
ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+ public ListedS3VersionSummaryTracker() {
+ super(getIdentifier(), getLogger(),
RecordObjectWriter.RECORD_SCHEMA);
+ }
+
+ @Override
+ protected void createRecordsForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+ ) throws IOException, SchemaNotFoundException {
Review comment:
I think you can remove the `throws Exception` from the signature.
##########
File path:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -157,6 +164,45 @@
@WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
})
public class ListGCSBucket extends AbstractGCSProcessor {
+ public static final AllowableValue BY_TIMESTAMPS = new
AllowableValue("timestamps", "Tracking Timestamps",
+ "This strategy tracks the latest timestamp of listed entity to
determine new/updated entities." +
+ " Since it only tracks few timestamps, it can manage listing state
efficiently." +
+ " However, any newly added, or updated entity having timestamp
older than the tracked latest timestamp can not be picked by this strategy." +
+ " Also may miss files when multiple subdirectories are being
written at the same time while listing is running.");
+
+ public static final AllowableValue BY_ENTITIES = new
AllowableValue("entities", "Tracking Entities",
+ "This strategy tracks information of all the listed entities within
the latest 'Entity Tracking Time Window' to determine new/updated entities." +
+ " This strategy can pick entities having old timestamp that can be
missed with 'Tracing Timestamps'." +
+ " Works even when multiple subdirectories are being written at the
same time while listing is running." +
+ " However additional DistributedMapCache controller service is
required and more JVM heap memory is used." +
Review comment:
```suggestion
" However, an additional DistributedMapCache controller service
is required, as well as more JVM heap capacity." +
```
##########
File path:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
}
+ private void listByTrackingEntities(ProcessContext context, ProcessSession
session) {
+ listedEntityTracker.trackEntities(context, session,
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ S3BucketLister bucketLister = getS3BucketLister(context,
getClient(), bucket);
+
+ List<ListableEntityWrapper<S3VersionSummary>> listedEntities =
bucketLister.listVersions().getVersionSummaries()
+ .stream()
+ .filter(s3VersionSummary ->
s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+ .map(s3VersionSummary -> new
ListableEntityWrapper<S3VersionSummary>(
+ s3VersionSummary,
+ S3VersionSummary::getKey,
+ summary -> summary.getKey() + "_" + summary.getVersionId(),
+ summary -> summary.getLastModified().getTime(),
+ S3VersionSummary::getSize
+ ))
+ .collect(Collectors.toList());
+
+ return listedEntities;
+ }, null);
+
+ justElectedPrimaryNode = false;
+ }
+
+ private class ListedS3VersionSummaryTracker extends
ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+ public ListedS3VersionSummaryTracker() {
+ super(getIdentifier(), getLogger(),
RecordObjectWriter.RECORD_SCHEMA);
+ }
+
+ @Override
+ protected void createRecordsForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+ ) throws IOException, SchemaNotFoundException {
+ publishListing(context, session, updatedEntities);
+ }
+
+ @Override
+ protected void createFlowFilesForEntities(
+ ProcessContext context,
+ ProcessSession session,
+ List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+ Function<ListableEntityWrapper<S3VersionSummary>, Map<String,
String>> createAttributes
+ ) {
+ publishListing(context, session, updatedEntities);
+ }
+
+ private void publishListing(ProcessContext context, ProcessSession
session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+ final S3ObjectWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeObjectWriter(session);
+ } else {
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger());
+ }
+
+ try {
+ writer.beginListing();
+ final int batchSize =
context.getProperty(BATCH_SIZE).asInteger();
+
+ int listCount = 0;
+ for (ListableEntityWrapper<S3VersionSummary> updatedEntity :
updatedEntities) {
+ S3VersionSummary s3VersionSummary =
updatedEntity.getRawEntity();
+
+ GetObjectTaggingResult taggingResult =
getTaggingResult(context, getClient(), s3VersionSummary);
+ ObjectMetadata objectMetadata = getObjectMetadata(context,
getClient(), s3VersionSummary);
+
+ writer.addToListing(s3VersionSummary, taggingResult,
objectMetadata);
+
+ listCount++;
+
+ if (listCount >= batchSize && writer.isCheckpoint()) {
+ getLogger().info("Successfully listed {} new files
from S3; routing to success", new Object[]{listCount});
Review comment:
There's a new varargs method for logging, I think you can remove the
array creation.
##########
File path:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -157,6 +164,45 @@
@WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
})
public class ListGCSBucket extends AbstractGCSProcessor {
+ public static final AllowableValue BY_TIMESTAMPS = new
AllowableValue("timestamps", "Tracking Timestamps",
+ "This strategy tracks the latest timestamp of listed entity to
determine new/updated entities." +
+ " Since it only tracks few timestamps, it can manage listing state
efficiently." +
+ " However, any newly added, or updated entity having timestamp
older than the tracked latest timestamp can not be picked by this strategy." +
+ " Also may miss files when multiple subdirectories are being
written at the same time while listing is running.");
+
+ public static final AllowableValue BY_ENTITIES = new
AllowableValue("entities", "Tracking Entities",
+ "This strategy tracks information of all the listed entities within
the latest 'Entity Tracking Time Window' to determine new/updated entities." +
+ " This strategy can pick entities having old timestamp that can be
missed with 'Tracing Timestamps'." +
+ " Works even when multiple subdirectories are being written at the
same time while listing is running." +
+ " However additional DistributedMapCache controller service is
required and more JVM heap memory is used." +
+ " See the description of 'Entity Tracking Time Window' property
for further details on how it works.");
Review comment:
```suggestion
" For more information on how the 'Entity Tracking Time Window'
property works, see the description.");
```
##########
File path:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -358,6 +444,131 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
getLogger().info("Successfully listed GCS bucket {} in {} millis", new
Object[]{bucket, listMillis});
}
+ private List<Storage.BlobListOption> getBlobListOptions(ProcessContext
context) {
+ final String prefix =
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+ final boolean useGenerations =
context.getProperty(USE_GENERATIONS).asBoolean();
+
+ final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+ if (prefix != null) {
+ listOptions.add(Storage.BlobListOption.prefix(prefix));
+ }
+ if (useGenerations) {
+ listOptions.add(Storage.BlobListOption.versions(true));
+ }
+
+ return listOptions;
+ }
+
+ private void listByTrackingEntities(ProcessContext context, ProcessSession
session) {
+ listedEntityTracker.trackEntities(context, session,
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ List<ListableBlob> listedEntities = new ArrayList<>();
+
+ Storage storage = getCloudService();
+ String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final List<Storage.BlobListOption> listOptions =
getBlobListOptions(context);
+
+ Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new
Storage.BlobListOption[0]));
+ int pageNr=0;
+ do {
+ for (final Blob blob : blobPage.getValues()) {
+ if (blob.getUpdateTime() >= minTimestampToList) {
+ listedEntities.add(new ListableBlob(
+ blob,
+ pageNr
+ ));
+ }
+ }
+ blobPage = blobPage.getNextPage();
+ pageNr++;
+ } while (blobPage != null);
+
+ return listedEntities;
+ }, null);
+
+ justElectedPrimaryNode = false;
+ }
+
+ protected class ListedBlobTracker extends
ListedEntityTracker<ListableBlob> {
+ public ListedBlobTracker() {
+ super(getIdentifier(), getLogger(),
RecordBlobWriter.RECORD_SCHEMA);
+ }
+
+ @Override
+ protected void createRecordsForEntities(ProcessContext context,
ProcessSession session, List<ListableBlob> updatedEntities) throws IOException,
SchemaNotFoundException {
+ publishListing(context, session, updatedEntities);
+ }
+
+ @Override
+ protected void createFlowFilesForEntities(ProcessContext context,
ProcessSession session, List<ListableBlob> updatedEntities,
Function<ListableBlob, Map<String, String>> createAttributes) {
+ publishListing(context, session, updatedEntities);
+ }
+
+ private void publishListing(ProcessContext context, ProcessSession
session, List<ListableBlob> updatedEntities) {
+ final BlobWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeBlobWriter(session);
+ } else {
+ writer = new RecordBlobWriter(session, writerFactory,
getLogger());
+ }
+
+ long maxTimestamp = 0L;
+ final Set<String> keysMatchingTimestamp = new HashSet<>();
+
+ try {
+ writer.beginListing();
+
+ int listCount = 0;
+ int pageNr = -1;
+ for (ListableBlob listableBlob : updatedEntities) {
+ Blob blob = listableBlob.getRawEntity();
+ int currentPageNr = listableBlob.getPageNr();
+
+ writer.addToListing(blob);
+
+ listCount++;
+
+ if (pageNr != -1 && pageNr != currentPageNr &&
writer.isCheckpoint()) {
Review comment:
Can `pageNr != -1` be true here at any time?
##########
File path:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
}
+ private void listByTrackingEntities(ProcessContext context, ProcessSession
session) {
+ listedEntityTracker.trackEntities(context, session,
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ S3BucketLister bucketLister = getS3BucketLister(context,
getClient(), bucket);
+
+ List<ListableEntityWrapper<S3VersionSummary>> listedEntities =
bucketLister.listVersions().getVersionSummaries()
+ .stream()
+ .filter(s3VersionSummary ->
s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+ .map(s3VersionSummary -> new
ListableEntityWrapper<S3VersionSummary>(
+ s3VersionSummary,
+ S3VersionSummary::getKey,
+ summary -> summary.getKey() + "_" + summary.getVersionId(),
+ summary -> summary.getLastModified().getTime(),
+ S3VersionSummary::getSize
+ ))
+ .collect(Collectors.toList());
+
+ return listedEntities;
Review comment:
```suggestion
return bucketLister.listVersions().getVersionSummaries()
.stream()
.filter(s3VersionSummary ->
s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
.map(s3VersionSummary -> new ListableEntityWrapper<>(
s3VersionSummary,
S3VersionSummary::getKey,
summary -> summary.getKey() + "_" +
summary.getVersionId(),
summary -> summary.getLastModified().getTime(),
S3VersionSummary::getSize
))
.collect(Collectors.toList());
}, null);
```
##########
File path:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -358,6 +444,131 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
getLogger().info("Successfully listed GCS bucket {} in {} millis", new
Object[]{bucket, listMillis});
}
+ private List<Storage.BlobListOption> getBlobListOptions(ProcessContext
context) {
+ final String prefix =
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+ final boolean useGenerations =
context.getProperty(USE_GENERATIONS).asBoolean();
+
+ final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+ if (prefix != null) {
+ listOptions.add(Storage.BlobListOption.prefix(prefix));
+ }
+ if (useGenerations) {
+ listOptions.add(Storage.BlobListOption.versions(true));
+ }
+
+ return listOptions;
+ }
+
+ private void listByTrackingEntities(ProcessContext context, ProcessSession
session) {
+ listedEntityTracker.trackEntities(context, session,
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ List<ListableBlob> listedEntities = new ArrayList<>();
+
+ Storage storage = getCloudService();
+ String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final List<Storage.BlobListOption> listOptions =
getBlobListOptions(context);
+
+ Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new
Storage.BlobListOption[0]));
+ int pageNr=0;
+ do {
+ for (final Blob blob : blobPage.getValues()) {
+ if (blob.getUpdateTime() >= minTimestampToList) {
+ listedEntities.add(new ListableBlob(
+ blob,
+ pageNr
+ ));
+ }
+ }
+ blobPage = blobPage.getNextPage();
+ pageNr++;
+ } while (blobPage != null);
+
+ return listedEntities;
+ }, null);
+
+ justElectedPrimaryNode = false;
+ }
+
+ protected class ListedBlobTracker extends
ListedEntityTracker<ListableBlob> {
+ public ListedBlobTracker() {
+ super(getIdentifier(), getLogger(),
RecordBlobWriter.RECORD_SCHEMA);
+ }
+
+ @Override
+ protected void createRecordsForEntities(ProcessContext context,
ProcessSession session, List<ListableBlob> updatedEntities) throws IOException,
SchemaNotFoundException {
+ publishListing(context, session, updatedEntities);
+ }
+
+ @Override
+ protected void createFlowFilesForEntities(ProcessContext context,
ProcessSession session, List<ListableBlob> updatedEntities,
Function<ListableBlob, Map<String, String>> createAttributes) {
+ publishListing(context, session, updatedEntities);
+ }
+
+ private void publishListing(ProcessContext context, ProcessSession
session, List<ListableBlob> updatedEntities) {
+ final BlobWriter writer;
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ writer = new AttributeBlobWriter(session);
+ } else {
+ writer = new RecordBlobWriter(session, writerFactory,
getLogger());
+ }
+
+ long maxTimestamp = 0L;
+ final Set<String> keysMatchingTimestamp = new HashSet<>();
+
+ try {
+ writer.beginListing();
+
+ int listCount = 0;
+ int pageNr = -1;
+ for (ListableBlob listableBlob : updatedEntities) {
+ Blob blob = listableBlob.getRawEntity();
+ int currentPageNr = listableBlob.getPageNr();
+
+ writer.addToListing(blob);
+
+ listCount++;
+
+ if (pageNr != -1 && pageNr != currentPageNr &&
writer.isCheckpoint()) {
+ commit(session, listCount, maxTimestamp,
keysMatchingTimestamp);
+ listCount = 0;
+ pageNr = currentPageNr;
+ }
+ }
+
+ writer.finishListing();
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of bucket due to
{}", new Object[] {e}, e);
+ writer.finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
Review comment:
I think we return here anyway.
##########
File path:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -358,6 +444,131 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
getLogger().info("Successfully listed GCS bucket {} in {} millis", new
Object[]{bucket, listMillis});
}
+ private List<Storage.BlobListOption> getBlobListOptions(ProcessContext
context) {
+ final String prefix =
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+ final boolean useGenerations =
context.getProperty(USE_GENERATIONS).asBoolean();
+
+ final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+ if (prefix != null) {
+ listOptions.add(Storage.BlobListOption.prefix(prefix));
+ }
+ if (useGenerations) {
+ listOptions.add(Storage.BlobListOption.versions(true));
+ }
+
+ return listOptions;
+ }
+
+ private void listByTrackingEntities(ProcessContext context, ProcessSession
session) {
+ listedEntityTracker.trackEntities(context, session,
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+ List<ListableBlob> listedEntities = new ArrayList<>();
+
+ Storage storage = getCloudService();
+ String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final List<Storage.BlobListOption> listOptions =
getBlobListOptions(context);
+
+ Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new
Storage.BlobListOption[0]));
+ int pageNr=0;
Review comment:
```suggestion
int pageNumber = 0;
```
##########
File path:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -266,7 +349,19 @@ long getStateTimestamp() {
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+ final String listingStrategy =
context.getProperty(LISTING_STRATEGY).getValue();
+
+ if (BY_TIMESTAMPS.equals(listingStrategy)) {
+ listByTrackingTimestamps(context, session);
+ } else if (BY_ENTITIES.equals(listingStrategy)) {
+ listByTrackingEntities(context, session);
+ } else {
+ throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
Review comment:
Is this necessary? The strategy can be chosen from AllowableValues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]