Mark
Thanks for your help. I have used the snippet of code you sent and it works 
although I am fairly sure I haven't implemented it correctly, I have had to put 
all of my code in the OnTrigger method, instead of in the the callback.I also 
need to change the filename attribute of the parsed flowfile, I have inserted 
the following line:
session.putAttribute(parsed, CoreAttributes.FILENAME.key(), 
context.getProperty(PARSED_FILENAME).getValue());

But it gives me the following error:2015-08-15 21:28:55,628 ERROR [Timer-Driven 
Process Thread-5] o.a.nifi.processors.standard.ParseMyData 
ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] 
ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process due to 
org.apache.nifi.processor.exception.FlowFileHandlingException: 
StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
 is not the most recent version of this FlowFile within this session 
(StandardProcessSession[id=21562]); rolling back session: 
org.apache.nifi.processor.exception.FlowFileHandlingException: 
StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
 is not the most recent version of this FlowFile within this session 
(StandardProcessSession[id=21562])


I have attached my processor class, I would be grateful if you could give it a 
quick look and tell me what I have done wrong.
Many thanksDave 


     On Saturday, 15 August 2015, 13:16, Mark Payne <[email protected]> 
wrote:
   

 David,

In this case, since you want to keep the original intact, you will need to 
create a 'child' flowfile to write to.
You do this with ProcessSession.create(FlowFile)

So you will have code that looks something like this:

final FlowFile original = session.get();
if (original == null) {
  return;
}

// create a new 'child' FlowFile. The framework will automatically handle
// the provenance information so that 'parsed' is forked from 'original'.
FlowFile parsed = session.create(original);

// Get an OutputStream for the 'parsed' FlowFile
parsed = session.write(parsed, new OutputStreamCallback() {
    public void process(OutputStream parsedOut) {

        // Get an InputStream for the original
        session.read(original, new InputStreamCallback() {
            public void process(InputStream originalIn) {
                // read from original FlowFile via originalIn
                // write to new FlowFile via parsedOut
            }
        });

    }
});

Does this give you what you need? If anything is still unclear, let us know!

Thanks
-Mark

----------------------------------------
> Date: Sat, 15 Aug 2015 10:04:54 +0100
> From: [email protected]
> Subject: Writing to a flowfile
> To: [email protected]
>
>
> Hi
>
> I'm writing a processor which parses a file, I want the parsed file to go to 
> relationship parsed, and the original file to go to relationship original, if 
> the parse was ok.
> If the parse fails I want the original file to go to relationship failure.
>
> I have an inner class which contains a callback which does the parsing. The 
> callback is called from the onTrigger method.
> My problem is that I want to read from my original flowFile and write to a 
> new flowFile, but it always seems to write to the original flowfile.
> How do I direct my bufferedwriter to my new flowfile?
>
> Many thanks
> Dave
>
> Sent from Yahoo! Mail on Android
>
                         

  
package org.apache.nifi.processors.standard;


import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.ObjectHolder;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.UUID;

@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"split", "text"})
@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines")

public class ParseMyData extends AbstractProcessor {

   static boolean error = false;

    public static final PropertyDescriptor PARSED_FILENAME = new PropertyDescriptor.Builder()
            .name("Parsed Filename")
            .description("The filename of the Parsed File")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("myFile.dict")
            .build();
   

    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
            .name("original")
            .description("The original input file will be routed to this destination when it has been successfully split into 1 or more files")
            .build();
    public static final Relationship REL_PARSED = new Relationship.Builder()
            .name("parsed")
            .description("The parsed files will be routed to this destination when an input file is successfully split into 1 or more split files")
            .build();
    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
            .build();

    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> properties = new ArrayList<>();
        properties.add(PARSED_FILENAME);
       
        this.properties = Collections.unmodifiableList(properties);

        final Set<Relationship> relationships = new HashSet<>();
        relationships.add(REL_ORIGINAL);
        relationships.add(REL_PARSED);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }


    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
     
        final ProcessorLog logger = getLogger();
       
        final FlowFile original = session.get();
        if (original == null) {
          return;
        }

        // create a new 'child' FlowFile. The framework will automatically handle
        // the provenance information so that 'parsed' is forked from 'original'.
        FlowFile parsed = session.create(original);
       //session.putAttribute(parsed, CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());
       
        // Get an OutputStream for the 'parsed' FlowFile
        parsed = session.write(parsed, new OutputStreamCallback() {
            public void process(final OutputStream parsedOut) {

                // Get an InputStream for the original
                session.read(original, new InputStreamCallback() {
                    public void process(InputStream originalIn) {
                        // read from original FlowFile via originalIn
                        // write to new FlowFile via parsedOut
                    	
                    	
                    	try(BufferedReader br = new BufferedReader (new InputStreamReader(originalIn, "UTF-8"),6404);
            					BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(parsedOut,"UTF-8"));){
            				int count = 0;
            				String matchPattern = "HOME";
            				String readLine = "";
            				while((readLine = br.readLine()) != null){
            					if (readLine.contains(matchPattern)){
            						count++;
            						bw.write("Found match " + count);
            						logger.info("Writing match found " + count);
            					}
            				}
            				bw.close();
            				                   	
                    	}catch(Exception e){
                    		error = true;
                    	}                    	
                    	
                    }
                });

            }
        });
        
        if (!error){
        	        	 
        	final String flowFileStr = parsed.toString();       	
       	    session.transfer(original, REL_ORIGINAL);
            session.transfer(parsed, REL_PARSED);
            logger.info("Transferred " + flowFileStr + " to REL_PARSED");
        }else{
        	logger.error("parsing to failure");
            session.transfer(original, REL_FAILURE);       
            session.remove(parsed);
        }
        
       
    }

    
    static class ParseDataCallback implements StreamCallback{
    	
    	private static final String matchPattern = "HOME";
    	final Map<String,String> flowFileAttrs = new HashMap<String,String>();
    	private ProcessorLog log;
    	private ProcessContext context;   	
    	
    	public ParseDataCallback(ProcessContext context, ProcessorLog log){
    		this.log = log;
    		this.context = context;
    		
    	}

		@Override
		public void process(InputStream in, OutputStream out)
				throws IOException {
			String readLine = "";
			int count = 0;
			try(BufferedReader br = new BufferedReader (new InputStreamReader(in, "UTF-8"),6404);
					BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out,"UTF-8"));){
				
				while((readLine = br.readLine()) != null){
					if (readLine.contains(matchPattern)){
						count++;
						bw.write("Found match " + count);
						log.info("Writing match found " + count);
					}else{
						bw.write("Match  not found");
						log.info("Writing out match not found");
						
					}
				}
				bw.close();
			}
			flowFileAttrs.put(CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());
			UUID uuid = UUID.randomUUID();
			flowFileAttrs.put(CoreAttributes.UUID.key(),uuid.toString());
		}
    	
    }

}

Reply via email to