JingsongLi commented on a change in pull request #12471:
URL: https://github.com/apache/flink/pull/12471#discussion_r435771296



##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
##########
@@ -181,56 +196,93 @@ private static SerializationRuntimeConverter 
createConverter(LogicalType type) {
                        case DOUBLE: // double
                        case TIME_WITHOUT_TIME_ZONE: // int
                        case DATE: // int
-                               return avroObject -> avroObject;
+                               converter = (schema, object) -> object;
+                               break;
                        case CHAR:
                        case VARCHAR:
-                               return object -> new Utf8(object.toString());
+                               converter = (schema, object) -> new 
Utf8(object.toString());
+                               break;
                        case BINARY:
                        case VARBINARY:
-                               return object -> ByteBuffer.wrap((byte[]) 
object);
+                               converter = (schema, object) -> 
ByteBuffer.wrap((byte[]) object);
+                               break;
                        case TIMESTAMP_WITHOUT_TIME_ZONE:
-                               return object -> ((TimestampData) 
object).toTimestamp().getTime();
+                               converter = (schema, object) -> 
((TimestampData) object).toTimestamp().getTime();
+                               break;
                        case DECIMAL:
-                               return object -> ByteBuffer.wrap(((DecimalData) 
object).toUnscaledBytes());
+                               converter = (schema, object) -> 
ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+                               break;
                        case ARRAY:
-                               return createArrayConverter((ArrayType) type);
+                               converter = createArrayConverter((ArrayType) 
type);
+                               break;
                        case ROW:
-                               return createRowConverter((RowType) type);
+                               converter = createRowConverter((RowType) type);
+                               break;
                        case MAP:
                        case MULTISET:
-                               return createMapConverter(type);
+                               converter = createMapConverter(type);
+                               break;
                        case RAW:
                        default:
                                throw new 
UnsupportedOperationException("Unsupported type: " + type);
                }
+
+               // wrap into nullable converter
+               return (schema, object) -> {
+                       if (object == null) {
+                               return null;
+                       }
+
+                       // get actual schema if it is a nullable schema
+                       Schema actualSchema;

Review comment:
       What is this for?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to