[ 
https://issues.apache.org/jira/browse/NIFI-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15596034#comment-15596034
 ] 

James Wing commented on NIFI-2809:
----------------------------------

[~frankmarit], thanks for the update (nifi-2809.patch).  I like the way you did 
the unit tests, the code coverage is very good.  After working with the tests 
and running the processors, I have some more comments:

*Overall*
* Documentation for each of the processors should describe the use of Default 
Credentials and link to 
https://developers.google.com/identity/protocols/application-default-credentials
 (or other helpful source) for reference.
* All of the processors should have an @InputRequirement annotation specifying 
what flowfile inputs they do or do not need.

*AbstractGCSProcessor*
* REL_RETRY has the name "failure", so the processors end up with just two 
relationships, not three.  The tests that reference REL_RETRY are not exclusive 
of REL_FAILURE, so I believe this issue is not exposed by the tests.
{code}
    public static final Relationship REL_FAILURE =
            new Relationship.Builder().name("failure")
                                      .description("FlowFiles are routed to 
failure relationship")
                                      .build();
    public static final Relationship REL_RETRY =
            new Relationship.Builder().name("failure")
                                      .description("FlowFiles are routed to 
failure relationship")
                                      .build();
{code}

*PutGCSObject*
* Why use a default ACL of allUsers/READER?  I'm OK leaving permission 
configuration for a later ticket as long as the default is sensible.  I'm not 
sure what all the alternatives are, but why not use the bucket default ACL?  
This would permit user choices, without defaulting to public exposure.
* We should add the ACL choice to the processor documentation, regardless of 
what we choose.
* Please wrap BufferedInputStream in try block at PutGCSObject.java:109 to 
ensure cleanup.

*ListGCS*
I recommend leaving the ListGCS processor for a future ticket.  I'm happy to 
review, but I believe list processors are more complicated than they might 
seem, and this may delay the ticket.  I love the idea, I did use it to test the 
other processors, and it resulted in my lengthy comments below.
* ListGCS should have annotations similar or identical to ListS3 like 
@TriggerSerially, @TriggerWhenEmpty, and 
@InputRequirement(Requirement.INPUT_FORBIDDEN).
* ListGCS does not persist any state information about what it last read from 
GCS.  For each scheduled execution, it will re-read and re-filter all of the 
entries?
* Between the Fetch and List processor, I recommend setting the {{filename}} 
attribute.  It's use is very standard, but it's definition is somewhat 
flexible. For example, ListS3 uses the complete object name/key, ListHDFS 
splits out {{path}} and {{filename}}.
* Should ListGCS write an attribute for the bucket name ("gcs.bucket"), for 
compatibility with Fetch and Delete?
* Are the REL_FAILURE and REL_RETRY relationships really practical?  I'm 
skeptical that enough information would come back from GCS that warrant a 
flowfile, but somehow not successful.  ListGCS does not use incoming flowfiles, 
so I'm not sure what retry would mean.  In the case of an exception, shouldn't 
the processor just log it and try again at its next scheduled interval?
* How large a bucket have you tested ListGCS against?  I believe it will 
perform poorly against large buckets as-is.
 * Why run a regex pattern against each list entry by default?  The same goal 
could be accomplished with a downstream RouteOnAttribute processor.  I 
understand the convenience of building it into ListGCS, but it might not be 
intuitive to users that they are pulling the complete set from GCS, then 
filtering locally.  An alternative might be to default to no regex filtering, 
and requiring the user to opt in with a pattern.
 * You should consider calling {{session.commit()}} for each page of results 
returned by GCS.  This was added recently to ListS3, and it greatly smooths out 
the performance of the processor when run against large buckets. I crashed NiFi 
running ListGCS in a very similar way to what ListS3 addressed:
{code}
2016-10-21 11:01:54,129 ERROR [Timer-Driven Process Thread-9] 
o.a.nifi.processors.gcp.storage.ListGCS 
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3664) ~[na:1.8.0_66]
        at java.lang.String.<init>(String.java:207) ~[na:1.8.0_66]
        at java.lang.String.toLowerCase(String.java:2647) ~[na:1.8.0_66]
        at 
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:847) 
~[google-http-client-1.22.0.jar:1.22.0]
        at com.google.api.client.json.JsonParser.parse(JsonParser.java:472) 
~[google-http-client-1.22.0.jar:1.22.0]
        at 
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:781) 
~[google-http-client-1.22.0.jar:1.22.0]
        at 
com.google.api.client.json.JsonParser.parseArray(JsonParser.java:648) 
~[google-http-client-1.22.0.jar:1.22.0]
        at 
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:740) 
~[google-http-client-1.22.0.jar:1.22.0]
        at com.google.api.client.json.JsonParser.parse(JsonParser.java:472) 
~[google-http-client-1.22.0.jar:1.22.0]
        at 
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:781) 
~[google-http-client-1.22.0.jar:1.22.0]
        at com.google.api.client.json.JsonParser.parse(JsonParser.java:382) 
~[google-http-client-1.22.0.jar:1.22.0]
        at com.google.api.client.json.JsonParser.parse(JsonParser.java:355) 
~[google-http-client-1.22.0.jar:1.22.0]
        at 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:87)
 ~[google-http-client-1.22.0.jar:1.22.0]
        at 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:81)
 ~[google-http-client-1.22.0.jar:1.22.0]
        at 
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:459) 
~[google-http-client-1.22.0.jar:1.22.0]
        at 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
 ~[google-api-client-1.22.0.jar:1.22.0]
        at 
org.apache.nifi.processors.gcp.storage.ListGCS.onTrigger(ListGCS.java:102) 
~[nifi-gcp-processors-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
        at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064)
 [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
        at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
 [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_66]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_66]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_66]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_66]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_66]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_66]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
{code}


> Add processors for Google Cloud Storage Fetch/Put/Delete
> --------------------------------------------------------
>
>                 Key: NIFI-2809
>                 URL: https://issues.apache.org/jira/browse/NIFI-2809
>             Project: Apache NiFi
>          Issue Type: New Feature
>    Affects Versions: 1.1.0
>            Reporter: Frank Maritato
>         Attachments: gcp_bundle.patch, gcs_bundle.patch, nifi-2809.patch
>
>
> Creating a ticket to add nifi processors for Google Cloud Storage similar to 
> how the AWS S3 processors are done:
> * FetchGCSObject
> * PutGCSObject
> * DeleteGCSObject
> I have the code mostly written and will attach the patch file when complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to