Sam,

Thanks for adding details about the flow.  Would also be good to know
more about the system itself.  One area that is likely having issues
is the SplitText process.  Unfortunately when splitting large input
text files we can end up with a tremendous amount of uncommitted flow
files and we have all of their information in memory at that time.  To
overcome this we recommend a two step split.  First split does '5000'
lines for example at the second split does '1' line.  This is with the
assumption that each line in your dataset ends up being an event in
JSON format.  So if this sounds right change your flow as follows:

ListS3 -> FetchS3Object -> RouteOnAttribute -> SplitText (5000 'Line
Split Count') -> SplitText (1 'Line Split Count') -> JoltTransform ->
CustomProcessor -> MergeContent (Bins=1, Entries=100) -> PostHttp

Given the input dataset is 150MB I'm guessing the above will help
greatly.  Also if you have the default of 512MB heap consider
increasing that a bit if able on your system to 1 or 2 GB.  Given the
flow you have and the recommendation above though this may not be
necessary.

The next point of concern then is the CustomProcessor and the size of
each event.  The processor as you show it is reading the full content
into memory.  Sometimes this cannot be avoided due to libraries being
used/etc.. but just keep that in mind.  If your JSON events are 1MB in
size then in memory will be a bit larger.  Then factor in
parallelization/etc.. and it can add up quickly.  You're probably fine
but let us know the min/max/avg event size and we can help you be
sure.

Next, like Joe mentioned you could be exceeding the maximum attribute
size and we're presently handling that poorly as per the JIRA joe
noted.  Is it necessary to promote the entirety of the content into a
flow file attribute?  This will defeat the benefit of NiFi's design as
flow file attributes are in memory whereas content is on disk
(usually) and can be efficiently streamed in and out.  The code as you
show it also has a double effect too as it takes the string, turns it
into bytes, (adding up in heap usage), then writes those bytes to
disk.  It is more ideal if the transform could just write the content
in a streaming fashion to disk.

A lot of comments here but just start with the top portion and we can
progress from there.

Thanks
Joe

On Mon, Feb 13, 2017 at 9:31 AM, sam <[email protected]> wrote:
> Thanks Joe. I got out of the hung state now, I actually deleted those folders
> before but probably some data did not delete in first go.
>
> I am using Nifi 1.1. My flow is like:
> ListS3 -> FetchS3Object -> RouteOnAttribute -> SplitText -> JoltTransform ->
> CustomProcessor -> MergeContent (Bins=1, Entries=100) -> PostHttp
> Max data is not more than 200 mb and but its uploaded every 15 mins.
>
> I can imagine what happened is my server got out of memory what following
> code inside onTrigger() of a custom processor started giving null pointer
> exceptions at value.get() in CustomProcessor:
>
>         final AtomicReference<String> value = new AtomicReference<>();
>             session.read(flowFile, new InputStreamCallback() {
>                 @Override
>                 public void process(final InputStream in) throws IOException
> {
>                     String results = null;
>                     try {
>                         String json = IOUtils.toString(in, "UTF-8");
>                         results = tranform(json);
>                     } catch (Exception e) { }
>
>                     value.set(results);
>                 }
>
>             });
>
>             String results = value.get();
>             if (results != null && !results.isEmpty()) {
>                 flowFile = session.putAttribute(flowFile, "json", results);
>             }
>
>             flowFile = session.write(flowFile, new OutputStreamCallback() {
>                 @Override
>                 public void process(OutputStream out) throws IOException {
>                     out.write(value.get().getBytes());
>                 }
>             });
>
> 1. Code already looks not ideal but not sure what else can cause flowfile's
> content to be null?
> 2. What should be done in case of these space issues to immediately get out?
> You do not know what has been processed and what could not go through. How
> to make it reprocess files?
>
> I am new to Nifi, may be it is something I can configure my dataflow to deal
> with but good to know what people do normally. Thanks
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-nifi-developer-list.39713.n7.nabble.com/Nifi-in-a-hung-state-tp14713p14717.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Reply via email to