danny0405 commented on pull request #12919:
URL: https://github.com/apache/flink/pull/12919#issuecomment-661830869


   > After a quick glimpse. Could we unify the 
`AvroRowDataDeserializationSchema` with 
`ConfluentRegistryAvroRowDataDeserializationSchema` and 
`RegistryAvroRowDataSerializationSchema`?
   > I really believe we need just a single AvroRowDeserializationSchema for 
avro for table API.
   > 
   > I am quite sure sth like this would work:
   > 
   > ```
   > RowDataDeserializationSchema extends ResultTypeQueryable {
   >    private final DeserializationSchema<GenericRecord> nestedSchema;
   >    private final DeserializationRuntimeConverter runtimeConverter;
   >    private final TypeInformation<RowData> resultType;
   > 
   >    public AvroRowDataDeserializationSchema2(
   >                    DeserializationSchema<GenericRecord> nestedSchema,
   >                    DeserializationRuntimeConverter runtimeConverter,
   >                    TypeInformation<RowData> resultType) {
   >            this.nestedSchema = nestedSchema;
   >            this.runtimeConverter = runtimeConverter;
   >            this.resultType = resultType;
   >    }
   > 
   >    @Override
   >    public void open(InitializationContext context) throws Exception {
   >            nestedSchema.open(context);
   >    }
   > 
   >    @Override
   >    public RowData deserialize(byte[] message) throws IOException {
   >            try {
   >                    GenericRecord deserialize = 
nestedSchema.deserialize(message);
   >                    return (RowData) runtimeConverter.convert(deserialize);
   >            } catch (Exception e) {
   >                    throw new IOException("Failed to deserialize Avro 
record.", e);
   >            }
   >    }
   > 
   >            @Override
   >    public boolean isEndOfStream(RowData nextElement) {
   >            return false;
   >    }
   > 
   >    @Override
   >    public TypeInformation<RowData> getProducedType() {
   >            return resultType;
   >    }
   > }
   > ```
   > 
   > and then you would use it like this:
   > 
   > in `AvroFormatFactory`:
   > 
   > ```
   > new RowDataDeserializationSchema(
   >    
AvroDeserializationSchema.forGeneric(AvroSchemaConverter.convertToSchema(rowType)),
   >         createRowConverter(rowType), // we would need to move this method 
to some utils class or to a common abstract class for factories
   >    rowDataTypeInfo
   > );
   > ```
   > 
   > in `RegistryAvroFormatFactory`:
   > 
   > ```
   > new RowDataDeserializationSchema(
   >    ConfluentRegistryAvroDeserializationSchema.forGeneric(
   >            AvroSchemaConverter.convertToSchema(rowType),
   >            schemaRegistryURL
   >    ),
   >         createRowConverter(rowType),
   >    rowDataTypeInfo
   > );
   > ```
   
   Thanks for the nice review, i have addressed your comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to