Hi:
I am processing some flow files and want to add success and failure attributes 
to the processed flow file and then transfer it.  But this is producing an 
exception:
org.apache.nifi.processor.exception.FlowFileHandlingException: 
StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default, 
section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is not 
known in this session (StandardProcessSession[id=70]) at 
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361)
 ~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
Here is the code segment in the onTrigger method:
Note - If I comment out the lines (as shown below) where I tried to add 
attributes to the flow file and it works.  If I uncomment the lines (either 
adding single attributes or multiple, the exception is produced)
        try {            List<Record> records = new ArrayList<>();
            // Prepare batch of records            for (int i = 0; i < 
flowFiles.size(); i++) {                final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();                session.exportTo(flowFiles.get(i), 
baos);                records.add(new 
Record().withData(ByteBuffer.wrap(baos.toByteArray())));            }
            // Send the batch            PutRecordBatchRequest 
putRecordBatchRequest = new PutRecordBatchRequest();            
putRecordBatchRequest.setDeliveryStreamName(streamName);            
putRecordBatchRequest.setRecords(records);            PutRecordBatchResult 
results = client.putRecordBatch(putRecordBatchRequest);
            // Separate out the successful and failed flow files            
List<PutRecordBatchResponseEntry> responseEntries = 
results.getRequestResponses();            List<FlowFile> failedFlowFiles = new 
ArrayList<>();            List<FlowFile> successfulFlowFiles = new 
ArrayList<>();            for (int i = 0; i < responseEntries.size(); i++ ) {   
             PutRecordBatchResponseEntry entry = responseEntries.get(i);        
        FlowFile flowFile = flowFiles.get(i);
                Map<String,String> attributes = new HashMap<>();                
attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I uncomment this 
line - or any other which adds attributes to the flowfile - i get the 
exception//                session.putAttribute(flowFile,RECORD_ID, 
entry.getRecordId());                if ( ! 
StringUtils.isBlank(entry.getErrorCode()) ) {                    
attributes.put(ERROR_CODE, entry.getErrorCode());                    
attributes.put(ERROR_MESSAGE, entry.getErrorMessage());//                    
session.putAllAttributes(flowFile, attributes);                    
failedFlowFiles.add(flowFile);                } else {//                    
session.putAllAttributes(flowFile, attributes);                    
successfulFlowFiles.add(flowFile);                }            }
            if ( failedFlowFiles.size() > 0 ) {                
session.transfer(failedFlowFiles, REL_FAILURE);                
getLogger().error("Failed to send {} records {}", new Object[]{stream, 
failedFlowFiles});            }
            if ( successfulFlowFiles.size() > 0 ) {// Throws exception when 
attributes are added to flow files                
session.transfer(successfulFlowFiles, REL_SUCCESS);                
getLogger().info("Success sent {} records {}", new Object[]{stream, 
successfulFlowFiles});            }
            records.clear();

Reply via email to