Hello,

The error message is indicating that you are trying to transfer an unknown
FlowFile because it is transferring a reference to the original FlowFile
before you updated the attributes. You would need to assign the result of
putAllAttributes (or putAttribute) and then transfer that:

flowFile = session.putAllAttributes(flowFile, attributes);

Thanks,

Bryan

On Wed, Feb 10, 2016 at 11:41 AM, M Singh <[email protected]>
wrote:

> Hi:
> I am processing some flow files and want to add success and failure
> attributes to the processed flow file and then transfer it.  But this is
> producing an exception:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default,
> section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is
> not known in this session (StandardProcessSession[id=70]) at
> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361)
> ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
> Here is the code segment in the onTrigger method:
> Note - If I comment out the lines (as shown below) where I tried to add
> attributes to the flow file and it works.  If I uncomment the lines (either
> adding single attributes or multiple, the exception is produced)
>         try {            List<Record> records = new ArrayList<>();
>             // Prepare batch of records            for (int i = 0; i <
> flowFiles.size(); i++) {                final ByteArrayOutputStream baos =
> new ByteArrayOutputStream();
> session.exportTo(flowFiles.get(i), baos);                records.add(new
> Record().withData(ByteBuffer.wrap(baos.toByteArray())));            }
>             // Send the batch            PutRecordBatchRequest
> putRecordBatchRequest = new PutRecordBatchRequest();
> putRecordBatchRequest.setDeliveryStreamName(streamName);
> putRecordBatchRequest.setRecords(records);            PutRecordBatchResult
> results = client.putRecordBatch(putRecordBatchRequest);
>             // Separate out the successful and failed flow files
>   List<PutRecordBatchResponseEntry> responseEntries =
> results.getRequestResponses();            List<FlowFile> failedFlowFiles =
> new ArrayList<>();            List<FlowFile> successfulFlowFiles = new
> ArrayList<>();            for (int i = 0; i < responseEntries.size(); i++ )
> {                PutRecordBatchResponseEntry entry =
> responseEntries.get(i);                FlowFile flowFile = flowFiles.get(i);
>                 Map<String,String> attributes = new HashMap<>();
>       attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I
> uncomment this line - or any other which adds attributes to the flowfile -
> i get the exception//
> session.putAttribute(flowFile,RECORD_ID, entry.getRecordId());
>   if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
> attributes.put(ERROR_CODE, entry.getErrorCode());
> attributes.put(ERROR_MESSAGE, entry.getErrorMessage());//
>   session.putAllAttributes(flowFile, attributes);
> failedFlowFiles.add(flowFile);                } else {//
> session.putAllAttributes(flowFile, attributes);
> successfulFlowFiles.add(flowFile);                }            }
>             if ( failedFlowFiles.size() > 0 ) {
> session.transfer(failedFlowFiles, REL_FAILURE);
> getLogger().error("Failed to send {} records {}", new Object[]{stream,
> failedFlowFiles});            }
>             if ( successfulFlowFiles.size() > 0 ) {// Throws exception
> when attributes are added to flow files
> session.transfer(successfulFlowFiles, REL_SUCCESS);
> getLogger().info("Success sent {} records {}", new Object[]{stream,
> successfulFlowFiles});            }
>             records.clear();

Reply via email to