stevenzwu commented on code in PR #5109:
URL: https://github.com/apache/iceberg/pull/5109#discussion_r903257911
##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java:
##########
@@ -482,8 +484,10 @@ protected void addElement(ReusableArrayData reused, E
element) {
@Override
protected ArrayData buildList(ReusableArrayData list) {
- list.setNumElements(writePos);
- return list;
+ // Since ReusableArrayData is not accepted by Flink, use
GenericArrayData temporarily to walk around it.
Review Comment:
FLIP-27 source reader uses this util method to clone the RowData as it
batches records for thread handover. It depends on `ArrayDataSerializer` to
clone the field. With the change from PR #4712, `ReusableArrayData` object is
not cloned and reused, which corrupts the batched RowData array.
```
public static RowData clone(RowData from, RowData reuse, RowType rowType,
TypeSerializer[] fieldSerializers) {
GenericRowData ret;
if (reuse instanceof GenericRowData) {
ret = (GenericRowData) reuse;
} else {
ret = new GenericRowData(from.getArity());
}
ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter =
RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i,
fieldSerializers[i].copy(getter.getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
}
return ret;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]