turcsanyip commented on code in PR #5916:
URL: https://github.com/apache/nifi/pull/5916#discussion_r841463319


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java:
##########
@@ -183,41 +185,53 @@ protected String getDefaultTimePrecision() {
         final String containerName = 
context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
         final String prefix = 
Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
         final List<BlobInfo> listing = new ArrayList<>();
+        final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
+
         try {
             final CloudBlobClient blobClient = 
AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
             final CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
 
             final OperationContext operationContext = new OperationContext();
             AzureStorageUtils.setProxy(operationContext, context);
 
-            for (final ListBlobItem blob : container.listBlobs(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
-                if (blob instanceof CloudBlob) {
-                    final CloudBlob cloudBlob = (CloudBlob) blob;
-                    final BlobProperties properties = 
cloudBlob.getProperties();
-                    final StorageUri uri = 
cloudBlob.getSnapshotQualifiedStorageUri();
-
-                    final Builder builder = new BlobInfo.Builder()
-                                              
.primaryUri(uri.getPrimaryUri().toString())
-                                              .blobName(cloudBlob.getName())
-                                              .containerName(containerName)
-                                              
.contentType(properties.getContentType())
-                                              
.contentLanguage(properties.getContentLanguage())
-                                              .etag(properties.getEtag())
-                                              
.lastModifiedTime(properties.getLastModified().getTime())
-                                              .length(properties.getLength());
-
-                    if (uri.getSecondaryUri() != null) {
-                        builder.secondaryUri(uri.getSecondaryUri().toString());
-                    }
-
-                    if (blob instanceof CloudBlockBlob) {
-                        builder.blobType(AzureStorageUtils.BLOCK);
-                    } else {
-                        builder.blobType(AzureStorageUtils.PAGE);
+            ResultContinuation continuationToken = null;
+
+            do {
+                final ResultSegment<ListBlobItem> result = 
container.listBlobsSegmented(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null, 
operationContext);
+                continuationToken = result.getContinuationToken();
+
+                for (final ListBlobItem blob : result.getResults()) {
+                    if (blob instanceof CloudBlob) {
+                        final CloudBlob cloudBlob = (CloudBlob) blob;
+                        final BlobProperties properties = 
cloudBlob.getProperties();
+
+                        if (properties.getLastModified().getTime() > 
minimumTimestamp) {

Review Comment:
   `>=` needs to be used instead because new files can be uploaded to Azure 
with the same timestamp (second precision on Azure) after the listing. 
`AbsractListProcessor` stores the already listed files with the previous max 
timesamp and can filter out those files later in the process.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java:
##########
@@ -199,34 +201,44 @@ protected boolean isListingResetNecessary(final 
PropertyDescriptor property) {
     }
 
     @Override
-    protected List<BlobInfo> performListing(ProcessContext context, Long 
minTimestamp, ListingMode listingMode) throws IOException {
-        String containerName = 
context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
-        String prefix = 
context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+    protected List<BlobInfo> performListing(final ProcessContext context, 
final Long minTimestamp, final ListingMode listingMode) throws IOException {
+        final String containerName = 
context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue();
+        final String prefix = 
context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue();
+        final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
 
         try {
-            List<BlobInfo> listing = new ArrayList<>();
+            final List<BlobInfo> listing = new ArrayList<>();
 
-            BlobContainerClient containerClient = 
storageClient.getBlobContainerClient(containerName);
+            final BlobContainerClient containerClient = 
storageClient.getBlobContainerClient(containerName);
 
-            ListBlobsOptions options = new ListBlobsOptions()
+            final ListBlobsOptions options = new ListBlobsOptions()
                     .setPrefix(prefix);
 
-            for (BlobItem blob : containerClient.listBlobs(options, null)) {
-                BlobItemProperties properties = blob.getProperties();
-
-                Builder builder = new Builder()
-                        .containerName(containerName)
-                        .blobName(blob.getName())
-                        .primaryUri(String.format("%s/%s", 
containerClient.getBlobContainerUrl(), blob.getName()))
-                        .etag(properties.getETag())
-                        .blobType(properties.getBlobType().toString())
-                        .contentType(properties.getContentType())
-                        .contentLanguage(properties.getContentLanguage())
-                        
.lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli())
-                        .length(properties.getContentLength());
-
-                listing.add(builder.build());
-            }
+            final Iterator<PagedResponse<BlobItem>> result = 
containerClient.listBlobs(options, null).iterableByPage().iterator();
+            String continuationToken;
+
+            do {
+                final PagedResponse<BlobItem> pagedResult = result.next();
+                continuationToken = pagedResult.getContinuationToken();

Review Comment:
   It seems to me that the items can be processed without tracking the 
continuation token and using a simple iterator (instead of the paged iterator).
   Based on this documentation:
   https://docs.microsoft.com/en-us/azure/developer/java/sdk/pagination
   
   > Make it possible to easily iterate over each element in the collection 
individually, ignoring any need for manual pagination or tracking of 
continuation tokens.
   
   > Regardless of whether you iterate by page or by each item, there's no 
difference in performance or the number of calls made to the service.
   
   It is applicable to the v12 Blob and the ADLS processors as they use the new 
SDK.
   So I think there is no change needed on those processors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to