awelless commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2175268484
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -99,27 +100,44 @@ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kin
while ((intermediateRecord = recordSet.next()) != null) {
Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
if (flowFiles.isEmpty()) {
- flowFile = session.create();
- flowFiles.add(flowFile);
+ final FlowFile createdFlowFile = session.create();
+ flowFiles.add(createdFlowFile);
// initialize the writer when the first record is read.
- createWriter(flowFile, session, outputRecord);
+ createWriter(createdFlowFile, session, outputRecord);
+ emptyFlowFileToRemove = createdFlowFile;
}
final WriteResult writeResult = writer.write(outputRecord);
- recordCount += writeResult.getRecordCount();
-
- // complete the FlowFile if there are no more incoming Kinesis
Records and no more records in this RecordSet
- if (lastRecord && !recordSet.isAnotherRecord()) {
- completeFlowFile(flowFiles, session, recordCount,
writeResult, kinesisRecord, stopWatch);
- }
- firstOutputRecord = false;
+ // definitely no longer empty
+ emptyFlowFileToRemove = null;
Review Comment:
Nit: perhaps, it would be simpler to keep `flowFile` but introduce `boolean
flowFileEmpty` to check if a file should be removed. Or even we can use
`firstSuccessfulWriteInfo` to check if anything has been written.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -99,27 +100,44 @@ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kin
while ((intermediateRecord = recordSet.next()) != null) {
Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
if (flowFiles.isEmpty()) {
- flowFile = session.create();
- flowFiles.add(flowFile);
+ final FlowFile createdFlowFile = session.create();
+ flowFiles.add(createdFlowFile);
// initialize the writer when the first record is read.
- createWriter(flowFile, session, outputRecord);
+ createWriter(createdFlowFile, session, outputRecord);
+ emptyFlowFileToRemove = createdFlowFile;
}
final WriteResult writeResult = writer.write(outputRecord);
- recordCount += writeResult.getRecordCount();
-
- // complete the FlowFile if there are no more incoming Kinesis
Records and no more records in this RecordSet
- if (lastRecord && !recordSet.isAnotherRecord()) {
- completeFlowFile(flowFiles, session, recordCount,
writeResult, kinesisRecord, stopWatch);
- }
- firstOutputRecord = false;
+ // definitely no longer empty
+ emptyFlowFileToRemove = null;
+ firstSuccessfulWriteInfo = firstSuccessfulWriteInfo == null
+ ? new SuccessfulWriteInfo(kinesisRecord, writeResult)
+ : firstSuccessfulWriteInfo;
+ lastSuccessfulWriteInfo = new
SuccessfulWriteInfo(kinesisRecord, writeResult);
}
- } catch (final MalformedRecordException | IOException |
SchemaNotFoundException e) {
+ } catch (final MalformedRecordException | IOException |
SchemaNotFoundException | IllegalTypeConversionException e) {
// write raw Kinesis Record to the parse failure relationship
getLogger().error("Failed to parse message from Kinesis Stream
using configured Record Reader and Writer due to {}",
e.getLocalizedMessage(), e);
- outputRawRecordOnException(firstOutputRecord, flowFile, flowFiles,
session, data, kinesisRecord, e);
+ outputRawRecordOnException(emptyFlowFileToRemove, flowFiles,
session, data, kinesisRecord, e);
+ }
+
+ // complete the FlowFile if there are no more incoming Kinesis Records
and no more records in this RecordSet
+ if (lastRecord && !flowFiles.isEmpty()) {
+ try {
+ completeFlowFile(flowFiles, session,
lastSuccessfulWriteInfo.writeResult.getRecordCount(),
lastSuccessfulWriteInfo.writeResult, lastSuccessfulWriteInfo.kinesisRecord,
stopWatch);
+ } catch (IOException e) {
+ getLogger().error("Failed to complete a FlowFile, dropped
records from stream: {}, shardId: {}, sequence number range: [{}, {}], due to
{}",
+ getStreamName(),
+ getKinesisShardId(),
+
firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+ lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+ e.getLocalizedMessage(),
+ e);
+ }
+ firstSuccessfulWriteInfo = null;
Review Comment:
Previously we did `outputRawRecordOnException` when `completeFlowFile` threw
any error. After refactoring this out, should we preserve the error handling
logic there?
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java:
##########
@@ -343,6 +347,23 @@ public void
testProcessUnparsableRecordWithRawOutputWithCheckpoint() throws Shut
session.assertNotRolledBack();
}
+ @Test
+ public void
testProcessUnparsableRecordWithRawOutputWithCheckpoint_lastRecordInvalid()
throws ShutdownException, InvalidStateException {
Review Comment:
Nit: `@ParameterizedTest` with `@MethodSource` instead of
`testProcessUnparsableRecordWithRawOutputWithCheckpoint` and
`testProcessUnparsableRecordWithRawOutputWithCheckpoint_lastRecordInvalid`
should make the tests clearer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]