[ 
https://issues.apache.org/jira/browse/NIFI-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868245#comment-15868245
 ] 

Mark Payne commented on NIFI-3205:
----------------------------------

[~joewitt] - that is to be expected. The Content Repo will retain some open 
file handles in order to provide better performance. At least one would be held 
open. If there are multiple threads writing to the Content Repo simultaneously, 
you could have multiple left. But those that are no longer in use should be 
cleared away.

> Uncaught Failures Can Leave New Flow Files on Disk
> --------------------------------------------------
>
>                 Key: NIFI-3205
>                 URL: https://issues.apache.org/jira/browse/NIFI-3205
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Alan Jackoway
>            Assignee: Mark Payne
>            Priority: Critical
>             Fix For: 1.2.0
>
>         Attachments: replicate_issue.xml
>
>
> We have been hitting a situation where our content repository quickly fills 
> the entire disk despite having archiving off and close to nothing queued.
> We believe this problem happens more often when a processor that creates many 
> flow files fails in the middle.
> I then created this test script and deployed it on a new nifi with a 100KB 
> GenerateFlowFile in front of it. The script makes 5 copies of the incoming 
> flow file, then does session.remove on those copies, then throws a 
> RuntimeException. However, the content repository grows 500KB every time it 
> runs. Then when you restart nifi, it cleans up the content repository with 
> messages like this:
> {noformat}
> 2016-12-15 11:17:29,774 INFO [main] o.a.n.c.repository.FileSystemRepository 
> Found unknown file 
> /Users/alanj/nifi-1.1.0/content_repository/1/1481818525279-1 (1126400 bytes) 
> in File System Repository; archiving file
> 2016-12-15 11:17:29,778 INFO [main] o.a.n.c.repository.FileSystemRepository 
> Found unknown file 
> /Users/alanj/nifi-1.1.0/content_repository/2/1481818585493-2 (409600 bytes) 
> in File System Repository; archiving file
> {noformat}
> The test processor is the following:
> {code:java}
> // Copyright 2016 (c) Cloudera
> package com.cloudera.edh.nifi.processors.bundles;
> import com.google.common.collect.Lists;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.OutputStream;
> import java.util.List;
> import org.apache.nifi.annotation.behavior.InputRequirement;
> import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
> import org.apache.nifi.flowfile.FlowFile;
> import org.apache.nifi.processor.AbstractProcessor;
> import org.apache.nifi.processor.ProcessContext;
> import org.apache.nifi.processor.ProcessSession;
> import org.apache.nifi.processor.exception.ProcessException;
> import org.apache.nifi.processor.io.InputStreamCallback;
> import org.apache.nifi.processor.io.OutputStreamCallback;
> import org.apache.nifi.stream.io.StreamUtils;
> /**
>  * Makes 5 copies of an incoming file, then fails and rolls back.
>  */
> @InputRequirement(value = Requirement.INPUT_REQUIRED)
> public class CopyAndFail extends AbstractProcessor {
>   @Override
>   public void onTrigger(ProcessContext context, ProcessSession session)
>       throws ProcessException {
>     FlowFile inputFile = session.get();
>     if (inputFile == null) {
>       context.yield();
>       return;
>     }
>     final List<FlowFile> newFiles = Lists.newArrayList();
>     
>     // Copy the file 5 times (simulates us opening a zip file and unpacking 
> its contents)
>     for (int i = 0; i < 5; i++) {
>       session.read(inputFile, new InputStreamCallback() {
>         @Override
>         public void process(InputStream inputStream) throws IOException {
>           FlowFile ff = session.create(inputFile);
>           ff = session.write(ff, new OutputStreamCallback() {
>             @Override
>             public void process(final OutputStream out) throws IOException {
>               StreamUtils.copy(inputStream, out);
>             }
>           });
>           newFiles.add(ff);
>         }
>       });
>     }
>     
>     getLogger().warn("Removing the new files");
>     System.err.println("Removing the new files");
>     session.remove(newFiles);
>     
>     // Simulate an error handling some file in the zip after unpacking the 
> rest
>     throw new RuntimeException();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to