WangRuoQi created FLINK-20763:
---------------------------------
Summary: canal format parse update record with null value get
wrong result
Key: FLINK-20763
URL: https://issues.apache.org/jira/browse/FLINK-20763
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.11.2
Reporter: WangRuoQi
Attachments: canal_format.patch
When i use canal format to consume mysql binlog like this:
{code:java}
select ymd,count(order_no),count(*) from order_table where status>=3 group by
ymd;{code}
I get result like this:
{code:java}
(20201212,10,10)
..
(20201212,20,24)
..
(20201212,100,130)
..{code}
I am ensure than when status>=3, every record has a valid order no, and i got a
result with dirrent count(order_no) and count(*).
I found this on debugging.
{code:java}
insert into order_table(ymd,order_no,status) values(20201212,null,1);
-- +I(20201212,null,1)
update table order_table set order_no=123,status=3 where id=1;
-- -U(20201212,123,1)
-- +U(20201212,123,3){code}
So i notice that the canal format meet bug when parse update record.
The source code logic is
{code:java}
} else if (OP_UPDATE.equals(type)) {
// "data" field is an array of row, contains new rows
ArrayData data = row.getArray(0);
// "old" field is an array of row, contains old values
ArrayData old = row.getArray(1);
for (int i = 0; i < data.size(); i++) {
// the underlying JSON deserialization schema always produce
GenericRowData.
GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
for (int f = 0; f < fieldCount; f++) {
if (before.isNullAt(f)) {
// not null fields in "old" (before) means the fields are changed
// null/empty fields in "old" (before) means the fields are not
changed
// so we just copy the not changed fields into before
before.setField(f, after.getField(f));
}
}
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(before);
out.collect(after);
{code}
When the old field has null value, it will be overwrite by the new record
value. That lead the aggregation to a wrong result.
I tried to fix this bug with following logic.
For each field. Use old value when old row has this field whether it is null or
nut, Use new value by default.
I hope this bug will be fixed on the future version.
[^canal_format.patch]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)