Mark
Thanks for your help. I have used the snippet of code you sent and it works
although I am fairly sure I haven't implemented it correctly, I have had to put
all of my code in the OnTrigger method, instead of in the the callback.I also
need to change the filename attribute of the parsed flowfile, I have inserted
the following line:
session.putAttribute(parsed, CoreAttributes.FILENAME.key(),
context.getProperty(PARSED_FILENAME).getValue());
But it gives me the following error:2015-08-15 21:28:55,628 ERROR [Timer-Driven
Process Thread-5] o.a.nifi.processors.standard.ParseMyData
ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b]
ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process due to
org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
is not the most recent version of this FlowFile within this session
(StandardProcessSession[id=21562]); rolling back session:
org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
is not the most recent version of this FlowFile within this session
(StandardProcessSession[id=21562])
I have attached my processor class, I would be grateful if you could give it a
quick look and tell me what I have done wrong.
Many thanksDave
On Saturday, 15 August 2015, 13:16, Mark Payne <[email protected]>
wrote:
David,
In this case, since you want to keep the original intact, you will need to
create a 'child' flowfile to write to.
You do this with ProcessSession.create(FlowFile)
So you will have code that looks something like this:
final FlowFile original = session.get();
if (original == null) {
return;
}
// create a new 'child' FlowFile. The framework will automatically handle
// the provenance information so that 'parsed' is forked from 'original'.
FlowFile parsed = session.create(original);
// Get an OutputStream for the 'parsed' FlowFile
parsed = session.write(parsed, new OutputStreamCallback() {
public void process(OutputStream parsedOut) {
// Get an InputStream for the original
session.read(original, new InputStreamCallback() {
public void process(InputStream originalIn) {
// read from original FlowFile via originalIn
// write to new FlowFile via parsedOut
}
});
}
});
Does this give you what you need? If anything is still unclear, let us know!
Thanks
-Mark
----------------------------------------
> Date: Sat, 15 Aug 2015 10:04:54 +0100
> From: [email protected]
> Subject: Writing to a flowfile
> To: [email protected]
>
>
> Hi
>
> I'm writing a processor which parses a file, I want the parsed file to go to
> relationship parsed, and the original file to go to relationship original, if
> the parse was ok.
> If the parse fails I want the original file to go to relationship failure.
>
> I have an inner class which contains a callback which does the parsing. The
> callback is called from the onTrigger method.
> My problem is that I want to read from my original flowFile and write to a
> new flowFile, but it always seems to write to the original flowfile.
> How do I direct my bufferedwriter to my new flowfile?
>
> Many thanks
> Dave
>
> Sent from Yahoo! Mail on Android
>
package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.ObjectHolder;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.UUID;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"split", "text"})
@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines")
public class ParseMyData extends AbstractProcessor {
static boolean error = false;
public static final PropertyDescriptor PARSED_FILENAME = new PropertyDescriptor.Builder()
.name("Parsed Filename")
.description("The filename of the Parsed File")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("myFile.dict")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original input file will be routed to this destination when it has been successfully split into 1 or more files")
.build();
public static final Relationship REL_PARSED = new Relationship.Builder()
.name("parsed")
.description("The parsed files will be routed to this destination when an input file is successfully split into 1 or more split files")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PARSED_FILENAME);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_PARSED);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final ProcessorLog logger = getLogger();
final FlowFile original = session.get();
if (original == null) {
return;
}
// create a new 'child' FlowFile. The framework will automatically handle
// the provenance information so that 'parsed' is forked from 'original'.
FlowFile parsed = session.create(original);
//session.putAttribute(parsed, CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());
// Get an OutputStream for the 'parsed' FlowFile
parsed = session.write(parsed, new OutputStreamCallback() {
public void process(final OutputStream parsedOut) {
// Get an InputStream for the original
session.read(original, new InputStreamCallback() {
public void process(InputStream originalIn) {
// read from original FlowFile via originalIn
// write to new FlowFile via parsedOut
try(BufferedReader br = new BufferedReader (new InputStreamReader(originalIn, "UTF-8"),6404);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(parsedOut,"UTF-8"));){
int count = 0;
String matchPattern = "HOME";
String readLine = "";
while((readLine = br.readLine()) != null){
if (readLine.contains(matchPattern)){
count++;
bw.write("Found match " + count);
logger.info("Writing match found " + count);
}
}
bw.close();
}catch(Exception e){
error = true;
}
}
});
}
});
if (!error){
final String flowFileStr = parsed.toString();
session.transfer(original, REL_ORIGINAL);
session.transfer(parsed, REL_PARSED);
logger.info("Transferred " + flowFileStr + " to REL_PARSED");
}else{
logger.error("parsing to failure");
session.transfer(original, REL_FAILURE);
session.remove(parsed);
}
}
static class ParseDataCallback implements StreamCallback{
private static final String matchPattern = "HOME";
final Map<String,String> flowFileAttrs = new HashMap<String,String>();
private ProcessorLog log;
private ProcessContext context;
public ParseDataCallback(ProcessContext context, ProcessorLog log){
this.log = log;
this.context = context;
}
@Override
public void process(InputStream in, OutputStream out)
throws IOException {
String readLine = "";
int count = 0;
try(BufferedReader br = new BufferedReader (new InputStreamReader(in, "UTF-8"),6404);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out,"UTF-8"));){
while((readLine = br.readLine()) != null){
if (readLine.contains(matchPattern)){
count++;
bw.write("Found match " + count);
log.info("Writing match found " + count);
}else{
bw.write("Match not found");
log.info("Writing out match not found");
}
}
bw.close();
}
flowFileAttrs.put(CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());
UUID uuid = UUID.randomUUID();
flowFileAttrs.put(CoreAttributes.UUID.key(),uuid.toString());
}
}
}