dariuszseweryn commented on PR #10053:
URL: https://github.com/apache/nifi/pull/10053#issuecomment-3078595349

   Taking a step back then. 
   
   # Problems with `KinesisRecordProcessorRecord`
   
   There are several functional problems with `KinesisRecordProcessorRecord`:
   1. The class does not complete a FlowFile if there is any exception thrown 
when the last record in a batch is processed. This makes NiFi log a warning but 
it was observed that such FlowFile was retrieved by another processor 
downstream without any attributes.
   1. The class does not handle well cases where schema of subsequent records 
differs especially when it is handled reactively because the exceptions thrown 
by the writers may leave the FlowFile in an inconsistent state — requiring a 
full rollback of transaction effectively halting the flow until human 
intervention. This is a problem in two situations:
       - When schema is inferred from the record itself — I can understand that 
it is considered "best effort" support, yet it is a problem from correctness 
standpoint
       - When schema is referenced in the record header via usage of 
`ConfluentEncodedSchemaReferenceReader` or `GlueEncodedSchemaReferenceReader` — 
which, I think, should be a valid, fully supported use-case. There are 
of-course several approaches to that which I will continue in a separate 
paragraph.
   1. The class incorrectly calculates `record.count` attribute when multiple 
`intermediateRecord` are available in `kinesisClientRecord.data()` content for 
the last processed kinesis record in batch. (If there are 2 intermediate 
records in the last kinesis record, `record.count` is reported as a 1 bigger 
than it should, if 3 intermediate records = +3, if 3 = +6, 4 = +10 and so on).
   
   There is a non-functional problem with `KinesisRecordProcessorRecord` — it 
contains and assumes a lot of state (FlowFile, RecordWriter, OutputStream, 
Session, contents of passed List<FlowFile>). Consolidation of state management 
is recommended for lowering cognitive burden for future readers. In this PR I 
have consolidated the state into `FlowFileState` class which is instantiated 
only in `#processRecord` function and encapsulates other state-altering 
interactions.
   
   # Approaches to records that have incompatible schema
   
   Currently the processor assumes the first kinesis record determines the 
schema for the whole FlowFile. If subsequent record schema does not match, 
depending on the mismatch case, it will either be not fully written (additional 
fields in subsequent records will get omitted) or an exception will get thrown 
(if field's content cannot be coerced, too large numeric value or a different 
data type) potentially leaving the FlowFile in inconsistent state (verified for 
`JsonRecordSetWriter`).
   
   Given no guarantee that subsequent records will have matching case (due to 
wrong inference or schema encoding in header) the current behavior is not well 
suited. What can be done?
   
   1. Add to contract of all RecordWriter implementations that in case of 
exception thrown the FlowFile is left in consistent state. This would need 
changes in potentially all implemented writers and as a way was rejected in 
[NIFI-14753](https://issues.apache.org/jira/browse/NIFI-14753). This would 
allow reactive approach to schema mismatches and routing offending records to 
Parse Failure (or rather introduce a new relationship) and continuing 
processing — otherwise the processing is halted without human intervention 
because the FlowFile needs to be dropped as it is in inconsistent state (I am 
unsure if readers downstream would be able to cope with a FlowFile in such 
state).
   1. Add schema validation in `KinesisRecordProcessorRecord` for each 
subsequent record which would allow completing an in-flight FlowFile and 
opening a new one. It was 
[rejected](https://github.com/apache/nifi/pull/10053#discussion_r2206166722) 
though.
       - If there is a fear of performance impact we could add a parameter to 
the `ConsumeKinesisStream` processor whether it should expect multiple schemas 
in a single batch of Kinesis records and do the compatibility check. It doesn't 
look like a good way forward from the UX perspective as the reading logic would 
need to spill outside of the `RecordReader` definition. The processor does not 
have means to introspect the reader settings though, so it cannot 
auto-configure.
   1. Make the processor do two passes over all `KinesisClientRecord` in batch. 
First to determine the encompassing schema, second to write data. At the first 
glance this approach seems surprising to me if I was a user of a stream that 
contains data in many schemas.
   
   Personally I think that from the correctness point of view the processor 
should be able to close a currently processed FlowFile when the subsequent 
record's schema does not match and create a new FlowFile. This could have 
performance implications for users that have records with alternating schemas 
on each one — it should be technically correct implementation though. For users 
that have a single, static schema setup the only potential downside would be 
the performance impact of schema validation for each record — it may be 
negligible though anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to