[
https://issues.apache.org/jira/browse/FLINK-30548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17653916#comment-17653916
]
heyu dou commented on FLINK-30548:
----------------------------------
If you agree, can you let me modify it?
> In some cases AvroDeserializationSchema buffer is not reset
> -----------------------------------------------------------
>
> Key: FLINK-30548
> URL: https://issues.apache.org/jira/browse/FLINK-30548
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Reporter: heyu dou
> Priority: Major
> Attachments: AvroNew.java, AvroOld.java
>
>
> When the avro schema changes, if the downstream uses the old schema to read
> the data generated by the new schema.
> The buffer pos of AvroDeserializationSchema.decoder will not be reset.
> This will result in misaligned reads.
> Because AvroDeserializationSchema.decoder is reuse and not reset buffer pos.
> The next read should start from the latest pos.
> [https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165]
> This line should be changed to :
> {code:java}
> this.decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
> {code}
>
> The following code can reproduce the problem
> {code:java}
> public class AvroChangeTest {
> private static AvroDeserializationSchema<AvroOld> avroSchema =
> AvroDeserializationSchema.forSpecific(AvroOld.class);
> @Test
> public void testWrite() throws IOException {
> DatumWriter<AvroNew> writer = new
> SpecificDatumWriter<AvroNew>(AvroNew.getClassSchema());
> File file = new File("avro_test.data");
> if (file.exists()) {
> file.delete();
> }
> for (int i = 0; i < 10; i++) { // avro serialization
> AvroNew taInfo = new AvroNew();
> taInfo.setCreateDate("2023-01-03");
> taInfo.setAdUserId(i);
> taInfo.setClickId("2" + i);
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,
> null);
> writer.write(taInfo, encoder);
> encoder.flush();
> out.close();
> byte[] data = out.toByteArray();
> System.out.println(data.length + "\t" + taInfo.toString());
> FileUtils.writeStringToFile(file, Base64.encodeBase64String(data)
> + "\n", true);
> }
> }
> @Test
> public void testRead() throws IOException { // avro deserialization
> File file = new File("avro_test.data");
> List<String> lines = FileUtils.readLines(file);
> for (String line : lines) {
> byte[] data = Base64.decodeBase64(line);
> AvroOld old = avroSchema.deserialize(data);
> System.out.println(old.toString());
> }
> }
> }
> {code}
> {code:java}
> {
> "namespace": "com.qihoo.dw.model.avroModel",
> "type": "record",
> "name": "AvroOld",
> "fields": [
> {
> "name": "create_date",
> "type": ["string","null"],
> "default": "",
> "doc": "事件时间(yyyy-MM-dd)"
> },
> {
> "name": "ad_user_id",
> "type": "long",
> "default": 0,
> "doc": "广告主ID"
> }
> ]
> } {code}
> {code:java}
> {
> "namespace": "com.qihoo.dw.model.avroModel",
> "type": "record",
> "name": "AvroNew",
> "fields": [
> {
> "name": "create_date",
> "type": ["string","null"],
> "default": "",
> "doc": "事件时间(yyyy-MM-dd)"
> },
> {
> "name": "ad_user_id",
> "type": "long",
> "default": 0,
> "doc": "广告主ID"
> },
> {
> "name": "click_id",
> "type": "string",
> "default": ""
> }
> ]
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)