dariuszseweryn commented on code in PR #10053: URL: https://github.com/apache/nifi/pull/10053#discussion_r2191033158
########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java: ########## @@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final ProcessSessionFactory sessionFactory, @Override void startProcessingRecords() { super.startProcessingRecords(); - outputStream = null; - writer = null; + if (currentFlowFileState != null) { + getLogger().warn("FlowFile State is not null at the start of processing records, this is not expected."); + closeSafe(currentFlowFileState, "FlowFile State"); + currentFlowFileState = null; + } + } + + @Override + void finishProcessingRecords(final ProcessSession session, final List<FlowFile> flowFiles, final StopWatch stopWatch) { + super.finishProcessingRecords(session, flowFiles, stopWatch); + try { + if (currentFlowFileState == null) { + return; + } + if (!flowFiles.contains(currentFlowFileState.flowFile)) { + getLogger().warn("Currently processed FlowFile is no longer available at processing end, this is not expected.", flowFiles); + closeSafe(currentFlowFileState, "FlowFile State"); + return; + } + completeFlowFile(flowFiles, session, stopWatch); + } catch (final FlowFileCompletionException e) { + if (!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) { + throw new KinesisBatchUnrecoverableException(e.getMessage(), e); + } + final boolean removeCurrentStateFlowFileIfAvailable = true; + final KinesisClientRecord kinesisRecord = currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord; + final byte[] data = getData(kinesisRecord); + outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable, flowFiles, session, data, kinesisRecord, e); + } finally { + currentFlowFileState = null; + } } @Override - void processRecord(final List<FlowFile> flowFiles, final KinesisClientRecord kinesisRecord, final boolean lastRecord, + void processRecord(final List<FlowFile> flowFiles, final KinesisClientRecord kinesisRecord, final ProcessSession session, final StopWatch stopWatch) { - boolean firstOutputRecord = true; - int recordCount = 0; - final ByteBuffer dataBuffer = kinesisRecord.data(); - byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] : new byte[0]; - if (dataBuffer != null) { - dataBuffer.get(data); + if (currentFlowFileState != null && !flowFiles.contains(currentFlowFileState.flowFile)) { + getLogger().warn("Currently processed FlowFile is no longer available, this is not expected.", flowFiles); + closeSafe(currentFlowFileState, "FlowFile State"); + currentFlowFileState = null; } - FlowFile flowFile = null; + final byte[] data = getData(kinesisRecord); + try (final InputStream in = new ByteArrayInputStream(data); final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length, getLogger()) ) { Record intermediateRecord; final PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet()); while ((intermediateRecord = recordSet.next()) != null) { - Record outputRecord = recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), getKinesisShardId()); - if (flowFiles.isEmpty()) { - flowFile = session.create(); - flowFiles.add(flowFile); - - // initialize the writer when the first record is read. - createWriter(flowFile, session, outputRecord); + final Record outputRecord = recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), getKinesisShardId()); + if (currentFlowFileState == null) { + // writer schema is determined by some, usually the first record + currentFlowFileState = initializeState(session, outputRecord, null); + flowFiles.add(currentFlowFileState.flowFile); } - final WriteResult writeResult = writer.write(outputRecord); - recordCount += writeResult.getRecordCount(); - - // complete the FlowFile if there are no more incoming Kinesis Records and no more records in this RecordSet - if (lastRecord && !recordSet.isAnotherRecord()) { - completeFlowFile(flowFiles, session, recordCount, writeResult, kinesisRecord, stopWatch); + if (!DataTypeUtils.isRecordTypeCompatible(currentFlowFileState.recordSchema, outputRecord, false)) { Review Comment: Example: first record: `{ "name": 10 }` / inferred schema `{ "name": INT }` second record: `{ "name": 100000000000000 }` / inferred schema `{ "name": LONG }` What WriteJsonResult leaves in FlowFile after throwing: ``` { "name": 10 } { "name" } ``` Since the processor does not know how to read/fix the written file — we should fail the processing in this case. Therefore we would need to do one of: - check/fix all the writers so they would not leave FlowFiles in inconsistent state - add other abstraction for rewinding FlowFiles for fixing content - check the schema before writing as proposed in this PR — there may be a different way to do it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org