This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch NIFI-9846 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 4a90334728372d35b326948e112fb79457bd5548 Author: Timea Barna <[email protected]> AuthorDate: Wed Mar 30 14:43:06 2022 +0200 NIFI-9846 Implement pagination listing for Azure List processors NIFI-9846 removing paging from ListAzureBlobStorage_v12 and ListAzureDataLakeStorage, adding = to filtering This closes #5916. Signed-off-by: Peter Turcsanyi <[email protected]> --- .../azure/storage/ListAzureBlobStorage.java | 66 +++++++++++++--------- .../azure/storage/ListAzureBlobStorage_v12.java | 49 +++++++++------- .../azure/storage/ListAzureDataLakeStorage.java | 8 ++- 3 files changed, 73 insertions(+), 50 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index eb439b4126..3e08e481f7 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.ResultContinuation; +import com.microsoft.azure.storage.ResultSegment; import com.microsoft.azure.storage.StorageUri; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; @@ -183,6 +185,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue(); final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse(""); final List<BlobInfo> listing = new ArrayList<>(); + final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp; + try { final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null); final CloudBlobContainer container = blobClient.getContainerReference(containerName); @@ -190,34 +194,44 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { final OperationContext operationContext = new OperationContext(); AzureStorageUtils.setProxy(operationContext, context); - for (final ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) { - if (blob instanceof CloudBlob) { - final CloudBlob cloudBlob = (CloudBlob) blob; - final BlobProperties properties = cloudBlob.getProperties(); - final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri(); - - final Builder builder = new BlobInfo.Builder() - .primaryUri(uri.getPrimaryUri().toString()) - .blobName(cloudBlob.getName()) - .containerName(containerName) - .contentType(properties.getContentType()) - .contentLanguage(properties.getContentLanguage()) - .etag(properties.getEtag()) - .lastModifiedTime(properties.getLastModified().getTime()) - .length(properties.getLength()); - - if (uri.getSecondaryUri() != null) { - builder.secondaryUri(uri.getSecondaryUri().toString()); - } - - if (blob instanceof CloudBlockBlob) { - builder.blobType(AzureStorageUtils.BLOCK); - } else { - builder.blobType(AzureStorageUtils.PAGE); + ResultContinuation continuationToken = null; + + do { + final ResultSegment<ListBlobItem> result = container.listBlobsSegmented(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null, operationContext); + continuationToken = result.getContinuationToken(); + + for (final ListBlobItem blob : result.getResults()) { + if (blob instanceof CloudBlob) { + final CloudBlob cloudBlob = (CloudBlob) blob; + final BlobProperties properties = cloudBlob.getProperties(); + + if (properties.getLastModified().getTime() >= minimumTimestamp) { + final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri(); + + final Builder builder = new BlobInfo.Builder() + .primaryUri(uri.getPrimaryUri().toString()) + .blobName(cloudBlob.getName()) + .containerName(containerName) + .contentType(properties.getContentType()) + .contentLanguage(properties.getContentLanguage()) + .etag(properties.getEtag()) + .lastModifiedTime(properties.getLastModified().getTime()) + .length(properties.getLength()); + + if (uri.getSecondaryUri() != null) { + builder.secondaryUri(uri.getSecondaryUri().toString()); + } + + if (blob instanceof CloudBlockBlob) { + builder.blobType(AzureStorageUtils.BLOCK); + } else { + builder.blobType(AzureStorageUtils.PAGE); + } + listing.add(builder.build()); + } } - listing.add(builder.build()); } - } + } while (continuationToken != null); } catch (final Throwable t) { throw new IOException(ExceptionUtils.getRootCause(t)); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java index de776a88e3..82bb821774 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java @@ -53,6 +53,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -199,33 +200,39 @@ public class ListAzureBlobStorage_v12 extends AbstractListProcessor<BlobInfo> { } @Override - protected List<BlobInfo> performListing(ProcessContext context, Long minTimestamp, ListingMode listingMode) throws IOException { - String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue(); - String prefix = context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue(); + protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException { + final String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions().getValue(); + final String prefix = context.getProperty(BLOB_NAME_PREFIX).evaluateAttributeExpressions().getValue(); + final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp; try { - List<BlobInfo> listing = new ArrayList<>(); + final List<BlobInfo> listing = new ArrayList<>(); - BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); + final BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); - ListBlobsOptions options = new ListBlobsOptions() + final ListBlobsOptions options = new ListBlobsOptions() .setPrefix(prefix); - for (BlobItem blob : containerClient.listBlobs(options, null)) { - BlobItemProperties properties = blob.getProperties(); - - Builder builder = new Builder() - .containerName(containerName) - .blobName(blob.getName()) - .primaryUri(String.format("%s/%s", containerClient.getBlobContainerUrl(), blob.getName())) - .etag(properties.getETag()) - .blobType(properties.getBlobType().toString()) - .contentType(properties.getContentType()) - .contentLanguage(properties.getContentLanguage()) - .lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli()) - .length(properties.getContentLength()); - - listing.add(builder.build()); + final Iterator<BlobItem> result = containerClient.listBlobs(options, null).iterator(); + + while (result.hasNext()) { + final BlobItem blob = result.next(); + final BlobItemProperties properties = blob.getProperties(); + + if (properties.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp) { + final Builder builder = new Builder() + .containerName(containerName) + .blobName(blob.getName()) + .primaryUri(String.format("%s/%s", containerClient.getBlobContainerUrl(), blob.getName())) + .etag(properties.getETag()) + .blobType(properties.getBlobType().toString()) + .contentType(properties.getContentType()) + .contentLanguage(properties.getContentLanguage()) + .lastModifiedTime(properties.getLastModified().toInstant().toEpochMilli()) + .length(properties.getContentLength()); + + listing.add(builder.build()); + } } return listing; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java index 268b8c168e..1de2ee9b99 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java @@ -210,12 +210,12 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo @Override protected List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException { - return performListing(context, listingMode, true); + return performListing(context, minTimestamp, listingMode, true); } @Override protected Integer countUnfilteredListing(final ProcessContext context) throws IOException { - return performListing(context, ListingMode.CONFIGURATION_VERIFICATION, false).size(); + return performListing(context, null, ListingMode.CONFIGURATION_VERIFICATION, false).size(); } @Override @@ -238,7 +238,7 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo return attributes; } - private List<ADLSFileInfo> performListing(final ProcessContext context, final ListingMode listingMode, + private List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode, final boolean applyFilters) throws IOException { try { final String fileSystem = evaluateFileSystemProperty(context, null); @@ -256,9 +256,11 @@ public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo options.setRecursive(recurseSubdirectories); final Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?"); + final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp; final List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream() .filter(pathItem -> !pathItem.isDirectory()) + .filter(pathItem -> pathItem.getLastModified().toInstant().toEpochMilli() >= minimumTimestamp) .map(pathItem -> new ADLSFileInfo.Builder() .fileSystem(fileSystem) .filePath(pathItem.getName())
