[ 
https://issues.apache.org/jira/browse/NIFI-14696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18012229#comment-18012229
 ] 

ASF subversion and git services commented on NIFI-14696:
--------------------------------------------------------

Commit 9e8c914630bbc04c6fc0e5ab1514dde05f535a30 in nifi's branch 
refs/heads/main from Dariusz Seweryn
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=9e8c914630 ]

NIFI-14696 Improved ConsumeKinesisStream handling of Record Schema Differences 
(#10053)

Signed-off-by: David Handermann <[email protected]>

> KinesisRecordProcessorRecord mishandles record schema changes
> -------------------------------------------------------------
>
>                 Key: NIFI-14696
>                 URL: https://issues.apache.org/jira/browse/NIFI-14696
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 2.4.0
>            Reporter: Dariusz Seweryn
>            Assignee: Dariusz Seweryn
>            Priority: Major
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> {{KinesisRecordProcessorRecord}} does determine the output FlowFile writer's 
> schema based on the first {{KinesisClientRecord}} reader's schema. This poses 
> a problem for cases the input records schema changes — either due to schema 
> being inferred or schema reference being embedded in the record itself. With 
> the current code several improper behaviors may occur:
>  * Record fields may get dropped — when first record had only field A, second 
> had fields A and B, resulting FlowFile will have only field A data for both 
> records.
>  * Record may cause an {{IllegalTypeConversionException}} — when first record 
> had a field A with type Integer, second record had a field A with value that 
> exceeds Integer range. This case requires a session rollback due to the 
> written/exceptional FlowFile potentially being in undefined state.
> Additionally there is a bug — in case the last {{KinesisClientRecord}} in the 
> processed batch is problematic (e.g. cannot be parsed or causes 
> {{{}IllegalTypeConversionException{}}}), the resulting FlowFile will not get 
> its attributes populated according to the {{ConsumeKinesisStream}} contract 
> nor the session will be rolled-back.
> There is one other minor bug in the implementation — the class incorrectly 
> calculates {{record.count}} attribute when multiple {{intermediateRecord}} 
> are available in {{kinesisClientRecord.data()}} content for the last 
> processed kinesis record in batch. (If there are 2 intermediate records in 
> the last kinesis record, record.count is reported as a 1 bigger than it 
> should, if 3 intermediate records = +3, if 3 = +6, 4 = +10 and so on).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to