Hi:
I am processing some flow files and want to add success and failure attributes
to the processed flow file and then transfer it. But this is producing an
exception:
org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=432bc163-28a0-4d08-b9e8-1674a649ae8c,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1455121175949-1, container=default,
section=1], offset=0, length=1],offset=0,name=129790440390423,size=1] is not
known in this session (StandardProcessSession[id=70]) at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2361)
~[nifi-framework-core-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
Here is the code segment in the onTrigger method:
Note - If I comment out the lines (as shown below) where I tried to add
attributes to the flow file and it works. If I uncomment the lines (either
adding single attributes or multiple, the exception is produced)
try { List<Record> records = new ArrayList<>();
// Prepare batch of records for (int i = 0; i <
flowFiles.size(); i++) { final ByteArrayOutputStream baos = new
ByteArrayOutputStream(); session.exportTo(flowFiles.get(i),
baos); records.add(new
Record().withData(ByteBuffer.wrap(baos.toByteArray()))); }
// Send the batch PutRecordBatchRequest
putRecordBatchRequest = new PutRecordBatchRequest();
putRecordBatchRequest.setDeliveryStreamName(streamName);
putRecordBatchRequest.setRecords(records); PutRecordBatchResult
results = client.putRecordBatch(putRecordBatchRequest);
// Separate out the successful and failed flow files
List<PutRecordBatchResponseEntry> responseEntries =
results.getRequestResponses(); List<FlowFile> failedFlowFiles = new
ArrayList<>(); List<FlowFile> successfulFlowFiles = new
ArrayList<>(); for (int i = 0; i < responseEntries.size(); i++ ) {
PutRecordBatchResponseEntry entry = responseEntries.get(i);
FlowFile flowFile = flowFiles.get(i);
Map<String,String> attributes = new HashMap<>();
attributes.put(RECORD_ID, entry.getRecordId());// NOTE - If I uncomment this
line - or any other which adds attributes to the flowfile - i get the
exception// session.putAttribute(flowFile,RECORD_ID,
entry.getRecordId()); if ( !
StringUtils.isBlank(entry.getErrorCode()) ) {
attributes.put(ERROR_CODE, entry.getErrorCode());
attributes.put(ERROR_MESSAGE, entry.getErrorMessage());//
session.putAllAttributes(flowFile, attributes);
failedFlowFiles.add(flowFile); } else {//
session.putAllAttributes(flowFile, attributes);
successfulFlowFiles.add(flowFile); } }
if ( failedFlowFiles.size() > 0 ) {
session.transfer(failedFlowFiles, REL_FAILURE);
getLogger().error("Failed to send {} records {}", new Object[]{stream,
failedFlowFiles}); }
if ( successfulFlowFiles.size() > 0 ) {// Throws exception when
attributes are added to flow files
session.transfer(successfulFlowFiles, REL_SUCCESS);
getLogger().info("Success sent {} records {}", new Object[]{stream,
successfulFlowFiles}); }
records.clear();