[ https://issues.apache.org/jira/browse/FLINK-30548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
heyu dou updated FLINK-30548: ----------------------------- Attachment: AvroNew.java AvroOld.java > In some cases AvroDeserializationSchema buffer is not reset > ----------------------------------------------------------- > > Key: FLINK-30548 > URL: https://issues.apache.org/jira/browse/FLINK-30548 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Reporter: heyu dou > Priority: Major > Attachments: AvroNew.java, AvroOld.java > > > When the avro schema changes, if the downstream uses the old schema to read > the data generated by the new schema. > The buffer pos of AvroDeserializationSchema.decoder will not be reset. > This will result in misaligned reads. > Because AvroDeserializationSchema.decoder is reuse and not reset buffer pos. > The next read should start from the latest pos. > [https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165] > This line should be changed to : > {code:java} > this.decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null); > {code} > > The following code can reproduce the problem > {code:java} > public class AvroChangeTest { > private static AvroDeserializationSchema<AvroOld> avroSchema = > AvroDeserializationSchema.forSpecific(AvroOld.class); > @Test > public void testWrite() throws IOException { > DatumWriter<AvroNew> writer = new > SpecificDatumWriter<AvroNew>(AvroNew.getClassSchema()); > File file = new File("avro_test.data"); > if (file.exists()) { > file.delete(); > } > for (int i = 0; i < 10; i++) { // avro serialization > AvroNew taInfo = new AvroNew(); > taInfo.setCreateDate("2023-01-03"); > taInfo.setAdUserId(i); > taInfo.setClickId("2" + i); > ByteArrayOutputStream out = new ByteArrayOutputStream(); > BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, > null); > writer.write(taInfo, encoder); > encoder.flush(); > out.close(); > byte[] data = out.toByteArray(); > System.out.println(data.length + "\t" + taInfo.toString()); > FileUtils.writeStringToFile(file, Base64.encodeBase64String(data) > + "\n", true); > } > } > @Test > public void testRead() throws IOException { // avro deserialization > File file = new File("avro_test.data"); > List<String> lines = FileUtils.readLines(file); > for (String line : lines) { > byte[] data = Base64.decodeBase64(line); > AvroOld old = avroSchema.deserialize(data); > System.out.println(old.toString()); > } > } > } > {code} > {code:java} > { > "namespace": "com.qihoo.dw.model.avroModel", > "type": "record", > "name": "AvroOld", > "fields": [ > { > "name": "create_date", > "type": ["string","null"], > "default": "", > "doc": "事件时间(yyyy-MM-dd)" > }, > { > "name": "ad_user_id", > "type": "long", > "default": 0, > "doc": "广告主ID" > } > ] > } {code} > {code:java} > { > "namespace": "com.qihoo.dw.model.avroModel", > "type": "record", > "name": "AvroNew", > "fields": [ > { > "name": "create_date", > "type": ["string","null"], > "default": "", > "doc": "事件时间(yyyy-MM-dd)" > }, > { > "name": "ad_user_id", > "type": "long", > "default": 0, > "doc": "广告主ID" > }, > { > "name": "click_id", > "type": "string", > "default": "" > } > ] > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)