[
https://issues.apache.org/jira/browse/NIFI-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15596034#comment-15596034
]
James Wing commented on NIFI-2809:
----------------------------------
[~frankmarit], thanks for the update (nifi-2809.patch). I like the way you did
the unit tests, the code coverage is very good. After working with the tests
and running the processors, I have some more comments:
*Overall*
* Documentation for each of the processors should describe the use of Default
Credentials and link to
https://developers.google.com/identity/protocols/application-default-credentials
(or other helpful source) for reference.
* All of the processors should have an @InputRequirement annotation specifying
what flowfile inputs they do or do not need.
*AbstractGCSProcessor*
* REL_RETRY has the name "failure", so the processors end up with just two
relationships, not three. The tests that reference REL_RETRY are not exclusive
of REL_FAILURE, so I believe this issue is not exposed by the tests.
{code}
public static final Relationship REL_FAILURE =
new Relationship.Builder().name("failure")
.description("FlowFiles are routed to
failure relationship")
.build();
public static final Relationship REL_RETRY =
new Relationship.Builder().name("failure")
.description("FlowFiles are routed to
failure relationship")
.build();
{code}
*PutGCSObject*
* Why use a default ACL of allUsers/READER? I'm OK leaving permission
configuration for a later ticket as long as the default is sensible. I'm not
sure what all the alternatives are, but why not use the bucket default ACL?
This would permit user choices, without defaulting to public exposure.
* We should add the ACL choice to the processor documentation, regardless of
what we choose.
* Please wrap BufferedInputStream in try block at PutGCSObject.java:109 to
ensure cleanup.
*ListGCS*
I recommend leaving the ListGCS processor for a future ticket. I'm happy to
review, but I believe list processors are more complicated than they might
seem, and this may delay the ticket. I love the idea, I did use it to test the
other processors, and it resulted in my lengthy comments below.
* ListGCS should have annotations similar or identical to ListS3 like
@TriggerSerially, @TriggerWhenEmpty, and
@InputRequirement(Requirement.INPUT_FORBIDDEN).
* ListGCS does not persist any state information about what it last read from
GCS. For each scheduled execution, it will re-read and re-filter all of the
entries?
* Between the Fetch and List processor, I recommend setting the {{filename}}
attribute. It's use is very standard, but it's definition is somewhat
flexible. For example, ListS3 uses the complete object name/key, ListHDFS
splits out {{path}} and {{filename}}.
* Should ListGCS write an attribute for the bucket name ("gcs.bucket"), for
compatibility with Fetch and Delete?
* Are the REL_FAILURE and REL_RETRY relationships really practical? I'm
skeptical that enough information would come back from GCS that warrant a
flowfile, but somehow not successful. ListGCS does not use incoming flowfiles,
so I'm not sure what retry would mean. In the case of an exception, shouldn't
the processor just log it and try again at its next scheduled interval?
* How large a bucket have you tested ListGCS against? I believe it will
perform poorly against large buckets as-is.
* Why run a regex pattern against each list entry by default? The same goal
could be accomplished with a downstream RouteOnAttribute processor. I
understand the convenience of building it into ListGCS, but it might not be
intuitive to users that they are pulling the complete set from GCS, then
filtering locally. An alternative might be to default to no regex filtering,
and requiring the user to opt in with a pattern.
* You should consider calling {{session.commit()}} for each page of results
returned by GCS. This was added recently to ListS3, and it greatly smooths out
the performance of the processor when run against large buckets. I crashed NiFi
running ListGCS in a very similar way to what ListS3 addressed:
{code}
2016-10-21 11:01:54,129 ERROR [Timer-Driven Process Thread-9]
o.a.nifi.processors.gcp.storage.ListGCS
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664) ~[na:1.8.0_66]
at java.lang.String.<init>(String.java:207) ~[na:1.8.0_66]
at java.lang.String.toLowerCase(String.java:2647) ~[na:1.8.0_66]
at
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:847)
~[google-http-client-1.22.0.jar:1.22.0]
at com.google.api.client.json.JsonParser.parse(JsonParser.java:472)
~[google-http-client-1.22.0.jar:1.22.0]
at
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:781)
~[google-http-client-1.22.0.jar:1.22.0]
at
com.google.api.client.json.JsonParser.parseArray(JsonParser.java:648)
~[google-http-client-1.22.0.jar:1.22.0]
at
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:740)
~[google-http-client-1.22.0.jar:1.22.0]
at com.google.api.client.json.JsonParser.parse(JsonParser.java:472)
~[google-http-client-1.22.0.jar:1.22.0]
at
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:781)
~[google-http-client-1.22.0.jar:1.22.0]
at com.google.api.client.json.JsonParser.parse(JsonParser.java:382)
~[google-http-client-1.22.0.jar:1.22.0]
at com.google.api.client.json.JsonParser.parse(JsonParser.java:355)
~[google-http-client-1.22.0.jar:1.22.0]
at
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:87)
~[google-http-client-1.22.0.jar:1.22.0]
at
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:81)
~[google-http-client-1.22.0.jar:1.22.0]
at
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:459)
~[google-http-client-1.22.0.jar:1.22.0]
at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
~[google-api-client-1.22.0.jar:1.22.0]
at
org.apache.nifi.processors.gcp.storage.ListGCS.onTrigger(ListGCS.java:102)
~[nifi-gcp-processors-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064)
[nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
[nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_66]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[na:1.8.0_66]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_66]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_66]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_66]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_66]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
{code}
> Add processors for Google Cloud Storage Fetch/Put/Delete
> --------------------------------------------------------
>
> Key: NIFI-2809
> URL: https://issues.apache.org/jira/browse/NIFI-2809
> Project: Apache NiFi
> Issue Type: New Feature
> Affects Versions: 1.1.0
> Reporter: Frank Maritato
> Attachments: gcp_bundle.patch, gcs_bundle.patch, nifi-2809.patch
>
>
> Creating a ticket to add nifi processors for Google Cloud Storage similar to
> how the AWS S3 processors are done:
> * FetchGCSObject
> * PutGCSObject
> * DeleteGCSObject
> I have the code mostly written and will attach the patch file when complete.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)