wuchong commented on a change in pull request #12471:
URL: https://github.com/apache/flink/pull/12471#discussion_r435808675
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
##########
@@ -181,56 +196,93 @@ private static SerializationRuntimeConverter
createConverter(LogicalType type) {
case DOUBLE: // double
case TIME_WITHOUT_TIME_ZONE: // int
case DATE: // int
- return avroObject -> avroObject;
+ converter = (schema, object) -> object;
+ break;
case CHAR:
case VARCHAR:
- return object -> new Utf8(object.toString());
+ converter = (schema, object) -> new
Utf8(object.toString());
+ break;
case BINARY:
case VARBINARY:
- return object -> ByteBuffer.wrap((byte[])
object);
+ converter = (schema, object) ->
ByteBuffer.wrap((byte[]) object);
+ break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return object -> ((TimestampData)
object).toTimestamp().getTime();
+ converter = (schema, object) ->
((TimestampData) object).toTimestamp().getTime();
+ break;
case DECIMAL:
- return object -> ByteBuffer.wrap(((DecimalData)
object).toUnscaledBytes());
+ converter = (schema, object) ->
ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+ break;
case ARRAY:
- return createArrayConverter((ArrayType) type);
+ converter = createArrayConverter((ArrayType)
type);
+ break;
case ROW:
- return createRowConverter((RowType) type);
+ converter = createRowConverter((RowType) type);
+ break;
case MAP:
case MULTISET:
- return createMapConverter(type);
+ converter = createMapConverter(type);
+ break;
case RAW:
default:
throw new
UnsupportedOperationException("Unsupported type: " + type);
}
+
+ // wrap into nullable converter
+ return (schema, object) -> {
+ if (object == null) {
+ return null;
+ }
+
+ // get actual schema if it is a nullable schema
+ Schema actualSchema;
Review comment:
The nested Row will use this to construct a `GenericData.Record`, so we
have to pass it through.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]