pvary commented on code in PR #14245:
URL: https://github.com/apache/iceberg/pull/14245#discussion_r2606236791
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java:
##########
@@ -41,30 +48,111 @@
public class RowDataToAvroGenericRecordConverter implements Function<RowData,
GenericRecord> {
private final RowDataToAvroConverters.RowDataToAvroConverter converter;
private final Schema avroSchema;
+ private final Set<Integer> timestampNanosFieldIndices;
+ private final RowType converterRowType;
- private RowDataToAvroGenericRecordConverter(RowType rowType, Schema
avroSchema) {
- this.converter = RowDataToAvroConverters.createConverter(rowType);
+ private RowDataToAvroGenericRecordConverter(
+ RowType converterRowType, Schema avroSchema, Set<Integer>
timestampNanosFieldIndices) {
+ this.converter = RowDataToAvroConverters.createConverter(converterRowType);
this.avroSchema = avroSchema;
+ this.timestampNanosFieldIndices = timestampNanosFieldIndices;
+ this.converterRowType = converterRowType;
}
@Override
public GenericRecord apply(RowData rowData) {
- return (GenericRecord) converter.convert(avroSchema, rowData);
+ // Pre-process: Flink's RowDataToAvroConverters expects Long (nanoseconds)
for timestamp-nanos,
+ // but our RowData has TimestampData. Convert TimestampData to Long
nanoseconds.
+ if (timestampNanosFieldIndices.isEmpty()) {
+ return (GenericRecord) converter.convert(avroSchema, rowData);
+ }
+
+ // Create a new GenericRowData with Long values for timestamp-nanos fields
+ GenericRowData processedRowData = new GenericRowData(rowData.getArity());
+ processedRowData.setRowKind(rowData.getRowKind());
+
+ for (int i = 0; i < rowData.getArity(); i++) {
+ if (timestampNanosFieldIndices.contains(i)) {
+ // Convert TimestampData to Long nanoseconds
+ if (!rowData.isNullAt(i)) {
+ TimestampData timestampData = rowData.getTimestamp(i, 9);
+ long nanos =
+ timestampData.getMillisecond() * 1_000_000L +
timestampData.getNanoOfMillisecond();
+ processedRowData.setField(i, nanos);
+ } else {
+ processedRowData.setField(i, null);
+ }
+ } else {
+ // Copy other fields as-is
+ if (rowData.isNullAt(i)) {
+ processedRowData.setField(i, null);
+ } else {
+ LogicalType fieldType = converterRowType.getTypeAt(i);
+ RowData.FieldGetter fieldGetter =
RowData.createFieldGetter(fieldType, i);
+ processedRowData.setField(i, fieldGetter.getFieldOrNull(rowData));
Review Comment:
Don't create the fieldGetters every time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]