[ 
https://issues.apache.org/jira/browse/NIFI-14696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dariusz Seweryn updated NIFI-14696:
-----------------------------------
    Description: 
{{KinesisRecordProcessorRecord}} does determine the output FlowFile writer's 
schema based on the first {{KinesisClientRecord}} reader's schema. This poses a 
problem for cases the input records schema changes — either due to schema being 
inferred or schema reference being embedded in the record itself. With the 
current code several improper behaviors may occur:
 * Record fields may get dropped — when first record had only field A, second 
had fields A and B, resulting FlowFile will have only field A data for both 
records.
 * Record may cause an {{IllegalTypeConversionException}} — when first record 
had a field A with type Integer, second record had a field A with value that 
exceeds Integer range. This case requires a session rollback due to the 
written/exceptional FlowFile potentially being in undefined state.

Additionally there is a bug — in case the last {{KinesisClientRecord}} in the 
processed batch is problematic (e.g. cannot be parsed or causes 
{{{}IllegalTypeConversionException{}}}), the resulting FlowFile will not get 
its attributes populated according to the {{ConsumeKinesisStream}} contract nor 
the session will be rolled-back.

There is one other minor bug in the implementation — the class incorrectly 
calculates {{record.count}} attribute when multiple {{intermediateRecord}} are 
available in {{kinesisClientRecord.data()}} content for the last processed 
kinesis record in batch. (If there are 2 intermediate records in the last 
kinesis record, record.count is reported as a 1 bigger than it should, if 3 
intermediate records = +3, if 3 = +6, 4 = +10 and so on).

  was:
{{KinesisRecordProcessorRecord}} does determine the output FlowFile schema 
writer based on the first {{KinesisClientRecord}} readers content. This poses a 
problem for cases the input records schema changes — either due to schema being 
inferred or schema reference being embedded in the record itself. With the 
current code several improper behaviors may occur:
 * Record fields may get dropped — when first record had only field A, second 
had fields A and B, resulting FlowFile will have only field A data for both 
records.
 * Record may cause an {{IllegalTypeConversionException}} — when first record 
had a field A with type Integer, second record had a field A with value that 
exceeds Integer range. This case requires a session rollback due to the 
written/exceptional FlowFile potentially being in undefined state.

Additionally there is a bug — in case the last {{KinesisClientRecord}} in the 
processed batch is problematic (e.g. cannot be parsed or causes 
{{{}IllegalTypeConversionException{}}}), the resulting FlowFile will not get 
its attributes populated according to the {{ConsumeKinesisStream}} contract nor 
the session will be rolled-back.

There is one other minor bug — the class incorrectly calculates 
{{record.count}} attribute when multiple {{intermediateRecord}} are available 
in {{kinesisClientRecord.data()}} content for the last processed kinesis record 
in batch. (If there are 2 intermediate records in the last kinesis record, 
record.count is reported as a 1 bigger than it should, if 3 intermediate 
records = +3, if 3 = +6, 4 = +10 and so on).


> KinesisRecordProcessorRecord mishandles record schema changes
> -------------------------------------------------------------
>
>                 Key: NIFI-14696
>                 URL: https://issues.apache.org/jira/browse/NIFI-14696
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 2.4.0
>            Reporter: Dariusz Seweryn
>            Assignee: Dariusz Seweryn
>            Priority: Major
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> {{KinesisRecordProcessorRecord}} does determine the output FlowFile writer's 
> schema based on the first {{KinesisClientRecord}} reader's schema. This poses 
> a problem for cases the input records schema changes — either due to schema 
> being inferred or schema reference being embedded in the record itself. With 
> the current code several improper behaviors may occur:
>  * Record fields may get dropped — when first record had only field A, second 
> had fields A and B, resulting FlowFile will have only field A data for both 
> records.
>  * Record may cause an {{IllegalTypeConversionException}} — when first record 
> had a field A with type Integer, second record had a field A with value that 
> exceeds Integer range. This case requires a session rollback due to the 
> written/exceptional FlowFile potentially being in undefined state.
> Additionally there is a bug — in case the last {{KinesisClientRecord}} in the 
> processed batch is problematic (e.g. cannot be parsed or causes 
> {{{}IllegalTypeConversionException{}}}), the resulting FlowFile will not get 
> its attributes populated according to the {{ConsumeKinesisStream}} contract 
> nor the session will be rolled-back.
> There is one other minor bug in the implementation — the class incorrectly 
> calculates {{record.count}} attribute when multiple {{intermediateRecord}} 
> are available in {{kinesisClientRecord.data()}} content for the last 
> processed kinesis record in batch. (If there are 2 intermediate records in 
> the last kinesis record, record.count is reported as a 1 bigger than it 
> should, if 3 intermediate records = +3, if 3 = +6, 4 = +10 and so on).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to