Hello, The error message is indicating that you are trying to transfer an unknown FlowFile because it is transferring a reference to the original FlowFile before you updated the attributes. You would need to assign the result of putAllAttributes (or putAttribute) and then transfer that:
flowFile = session.putAllAttributes(flowFile, attributes); Thanks, Bryan On Wed, Feb 10, 2016 at 11:41 AM, M Singh <[email protected]> wrote: > Hi: > I am processing some flow files and want to add success and failure > attributes to the processed flow file and then transfer it. But this is > producing an exception: > org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default, > section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is > not known in this session (StandardProcessSession[id=70]) at > org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361) > ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT] > Here is the code segment in the onTrigger method: > Note - If I comment out the lines (as shown below) where I tried to add > attributes to the flow file and it works. If I uncomment the lines (either > adding single attributes or multiple, the exception is produced) > try { List<Record> records = new ArrayList<>(); > // Prepare batch of records for (int i = 0; i < > flowFiles.size(); i++) { final ByteArrayOutputStream baos = > new ByteArrayOutputStream(); > session.exportTo(flowFiles.get(i), baos); records.add(new > Record().withData(ByteBuffer.wrap(baos.toByteArray()))); } > // Send the batch PutRecordBatchRequest > putRecordBatchRequest = new PutRecordBatchRequest(); > putRecordBatchRequest.setDeliveryStreamName(streamName); > putRecordBatchRequest.setRecords(records); PutRecordBatchResult > results = client.putRecordBatch(putRecordBatchRequest); > // Separate out the successful and failed flow files > List<PutRecordBatchResponseEntry> responseEntries = > results.getRequestResponses(); List<FlowFile> failedFlowFiles = > new ArrayList<>(); List<FlowFile> successfulFlowFiles = new > ArrayList<>(); for (int i = 0; i < responseEntries.size(); i++ ) > { PutRecordBatchResponseEntry entry = > responseEntries.get(i); FlowFile flowFile = flowFiles.get(i); > Map<String,String> attributes = new HashMap<>(); > attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I > uncomment this line - or any other which adds attributes to the flowfile - > i get the exception// > session.putAttribute(flowFile,RECORD_ID, entry.getRecordId()); > if ( ! StringUtils.isBlank(entry.getErrorCode()) ) { > attributes.put(ERROR_CODE, entry.getErrorCode()); > attributes.put(ERROR_MESSAGE, entry.getErrorMessage());// > session.putAllAttributes(flowFile, attributes); > failedFlowFiles.add(flowFile); } else {// > session.putAllAttributes(flowFile, attributes); > successfulFlowFiles.add(flowFile); } } > if ( failedFlowFiles.size() > 0 ) { > session.transfer(failedFlowFiles, REL_FAILURE); > getLogger().error("Failed to send {} records {}", new Object[]{stream, > failedFlowFiles}); } > if ( successfulFlowFiles.size() > 0 ) {// Throws exception > when attributes are added to flow files > session.transfer(successfulFlowFiles, REL_SUCCESS); > getLogger().info("Success sent {} records {}", new Object[]{stream, > successfulFlowFiles}); } > records.clear();
