[ 
https://issues.apache.org/jira/browse/FLINK-30548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

heyu dou updated FLINK-30548:
-----------------------------
    Attachment: AvroNew.java
                AvroOld.java

> In some cases AvroDeserializationSchema buffer is not reset
> -----------------------------------------------------------
>
>                 Key: FLINK-30548
>                 URL: https://issues.apache.org/jira/browse/FLINK-30548
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: heyu dou
>            Priority: Major
>         Attachments: AvroNew.java, AvroOld.java
>
>
> When the avro schema changes, if the downstream uses the old schema to read 
> the data generated by the new schema.
> The buffer pos of AvroDeserializationSchema.decoder will not be reset.
> This will result in misaligned reads.
> Because AvroDeserializationSchema.decoder is reuse and not reset buffer pos.
> The next read should start from the latest pos.
> [https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165]
> This line should be changed to :
> {code:java}
> this.decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null); 
> {code}
>  
> The following code can reproduce the problem
> {code:java}
> public class AvroChangeTest {
>     private static AvroDeserializationSchema<AvroOld> avroSchema = 
> AvroDeserializationSchema.forSpecific(AvroOld.class);
>     @Test
>     public void testWrite() throws IOException {
>         DatumWriter<AvroNew> writer = new 
> SpecificDatumWriter<AvroNew>(AvroNew.getClassSchema());
>         File file = new File("avro_test.data");
>         if (file.exists()) {
>             file.delete();
>         }
>         for (int i = 0; i < 10; i++) { // avro serialization
>             AvroNew taInfo = new AvroNew();
>             taInfo.setCreateDate("2023-01-03");
>             taInfo.setAdUserId(i);
>             taInfo.setClickId("2" + i);
>             ByteArrayOutputStream out = new ByteArrayOutputStream();
>             BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, 
> null);
>             writer.write(taInfo, encoder);
>             encoder.flush();
>             out.close();
>             byte[] data = out.toByteArray();
>             System.out.println(data.length + "\t" + taInfo.toString());
>             FileUtils.writeStringToFile(file, Base64.encodeBase64String(data) 
> + "\n", true);
>         }
>     }
>     @Test
>     public void testRead() throws IOException { // avro deserialization
>         File file = new File("avro_test.data");
>         List<String> lines = FileUtils.readLines(file);
>         for (String line : lines) {
>             byte[] data = Base64.decodeBase64(line);
>             AvroOld old = avroSchema.deserialize(data);
>             System.out.println(old.toString());
>         }
>     }
> }
>  {code}
> {code:java}
> {
>     "namespace": "com.qihoo.dw.model.avroModel",
>     "type": "record",
>     "name": "AvroOld",
>     "fields": [
>         {
>             "name": "create_date",
>             "type": ["string","null"],
>             "default": "",
>             "doc": "事件时间(yyyy-MM-dd)"
>         },
>         {
>             "name": "ad_user_id",
>             "type": "long",
>             "default": 0,
>             "doc": "广告主ID"
>         }
>     ]
> } {code}
> {code:java}
> {
>     "namespace": "com.qihoo.dw.model.avroModel",
>     "type": "record",
>     "name": "AvroNew",
>     "fields": [
>         {
>             "name": "create_date",
>             "type": ["string","null"],
>             "default": "",
>             "doc": "事件时间(yyyy-MM-dd)"
>         },
>         {
>             "name": "ad_user_id",
>             "type": "long",
>             "default": 0,
>             "doc": "广告主ID"
>         },
>         {
>             "name": "click_id",
>             "type": "string",
>             "default": ""
>         }
>     ]
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to