Lehel44 commented on a change in pull request #5413:
URL: https://github.com/apache/nifi/pull/5413#discussion_r736557038



##########
File path: 
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -358,6 +444,131 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         getLogger().info("Successfully listed GCS bucket {} in {} millis", new 
Object[]{bucket, listMillis});
     }
 
+    private List<Storage.BlobListOption> getBlobListOptions(ProcessContext 
context) {
+        final String prefix = 
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+        final boolean useGenerations = 
context.getProperty(USE_GENERATIONS).asBoolean();
+
+        final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+        if (prefix != null) {
+            listOptions.add(Storage.BlobListOption.prefix(prefix));
+        }
+        if (useGenerations) {
+            listOptions.add(Storage.BlobListOption.versions(true));
+        }
+
+        return listOptions;
+    }
+
+    private void listByTrackingEntities(ProcessContext context, ProcessSession 
session) {
+        listedEntityTracker.trackEntities(context, session, 
justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            List<ListableBlob> listedEntities = new ArrayList<>();
+
+            Storage storage = getCloudService();
+            String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            final List<Storage.BlobListOption> listOptions = 
getBlobListOptions(context);
+
+            Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new 
Storage.BlobListOption[0]));
+            int pageNr=0;
+            do {
+                for (final Blob blob : blobPage.getValues()) {
+                    if (blob.getUpdateTime() >= minTimestampToList) {
+                        listedEntities.add(new ListableBlob(
+                            blob,
+                            pageNr
+                        ));
+                    }
+                }
+                blobPage = blobPage.getNextPage();
+                pageNr++;
+            } while (blobPage != null);
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    protected class ListedBlobTracker extends 
ListedEntityTracker<ListableBlob> {
+        public ListedBlobTracker() {
+            super(getIdentifier(), getLogger(), 
RecordBlobWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(ProcessContext context, 
ProcessSession session, List<ListableBlob> updatedEntities) throws IOException, 
SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(ProcessContext context, 
ProcessSession session, List<ListableBlob> updatedEntities, 
Function<ListableBlob, Map<String, String>> createAttributes) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession 
session, List<ListableBlob> updatedEntities) {
+            final BlobWriter writer;
+            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeBlobWriter(session);
+            } else {
+                writer = new RecordBlobWriter(session, writerFactory, 
getLogger());
+            }
+
+            long maxTimestamp = 0L;
+            final Set<String> keysMatchingTimestamp = new HashSet<>();
+
+            try {
+                writer.beginListing();
+
+                int listCount = 0;
+                int pageNr = -1;
+                for (ListableBlob listableBlob : updatedEntities) {
+                    Blob blob = listableBlob.getRawEntity();
+                    int currentPageNr = listableBlob.getPageNr();
+
+                    writer.addToListing(blob);
+
+                    listCount++;
+
+                    if (pageNr != -1 && pageNr != currentPageNr && 
writer.isCheckpoint()) {

Review comment:
       I meant if it's initialized to -1, why check if its not -1 in the if 
condition?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to