[ 
https://issues.apache.org/jira/browse/AVRO-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oscar Westra van Holthe - Kind resolved AVRO-4006.
--------------------------------------------------
    Fix Version/s: 1.12.0
       Resolution: Fixed

Fixed in 1.12.0. Not in 1.11.4 as cherry-picking yields a merge conflict.

> [Java] DataFileReader does not correctly identify last sync marker when 
> reading/skipping blocks
> -----------------------------------------------------------------------------------------------
>
>                 Key: AVRO-4006
>                 URL: https://issues.apache.org/jira/browse/AVRO-4006
>             Project: Apache Avro
>          Issue Type: Bug
>          Components: java
>    Affects Versions: 1.11.3
>            Reporter: Oscar Westra van Holthe - Kind
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The following code demonstrates the problem:
> {code:java}
> import org.apache.avro.Schema;
> import org.apache.avro.SchemaBuilder;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.DataFileWriter;
> import org.apache.avro.file.SeekableFileInput;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericDatumWriter;
> import org.apache.avro.generic.IndexedRecord;
> import org.apache.avro.io.DatumReader;
> import java.io.File;
> import java.io.IOException;
> public class AvroTest {
>     public static void main(String[] args) throws IOException {
>         File avroFile = new File("test.avro");
>         GenericData model = GenericData.get();
>         Schema simple = 
> SchemaBuilder.record("TestRecord").fields().requiredString("text").endRecord();
>         Schema.Field textField = simple.getField("text");
>         try (DataFileWriter<Object> writer = new DataFileWriter<>(new 
> GenericDatumWriter<>(null, model)).create(simple, avroFile)) {
>             for (int i = 1; i <= 1000; i++) {
>                 Object record = model.newRecord(null, simple);
>                 model.setField(record, textField.name(), textField.pos(), "i 
> = " + i);
>                 writer.append(record);
>                 if (i % 100 == 0) {
>                     long syncPos = writer.sync();
>                     System.out.printf("Synced %d records; file position 
> %d%n", i, syncPos);
>                 }
>             }
>         }
>         IndexedRecord result;
>         DatumReader<IndexedRecord> datumReader = new 
> GenericDatumReader<>(simple, simple, model);
>         try (SeekableFileInput sfi = new SeekableFileInput(avroFile);
>              MyDataFileReader<IndexedRecord> reader = new 
> MyDataFileReader<>(sfi, datumReader)) {
>             // Find the start of the last block reading the entire file, 
> WITHOUT decoding any records.
>             // Note that this does decompress the data, but that's so fast 
> these days that it hardly affects reading speed.
>             long lastSyncPos = reader.previousSync();
>             while (reader.hasNext()) {
>                 lastSyncPos = reader.previousSync();
>                 System.out.printf("Sync marker at %d%n", lastSyncPos);
>                 // Mark the block as read, so hasNext() will read the next 
> block
>                 reader.nextBlock();
>             }
>             System.out.printf("Sync marker at %d%n", reader.previousSync());
>             reader.seek(lastSyncPos);
>             IndexedRecord lastRecord1 = null;
>             int decoded = 0;
>             while (reader.hasNext()) {
>                 lastRecord1 = reader.next(lastRecord1);
>                 decoded++;
>             }
>             System.out.printf("Decoded %d records%n", decoded);
>             result = lastRecord1;
>         }
>         Object lastRecord = result;
>         System.out.printf("Last record: %s%n", lastRecord);
>     }
>     private static class MyDataFileReader<T> extends DataFileReader<T> {
>         public MyDataFileReader(SeekableFileInput sfi, DatumReader<T> 
> datumReader) throws IOException {
>             super(sfi, datumReader);
>         }
>         @Override
>         public void blockFinished() throws IOException {
>             super.blockFinished();
>         }
>     }
> }
> {code}
> The output:
> {noformat}
> Synced 100 records; file position 828
> Synced 200 records; file position 1648
> Synced 300 records; file position 2468
> Synced 400 records; file position 3288
> Synced 500 records; file position 4108
> Synced 600 records; file position 4928
> Synced 700 records; file position 5748
> Synced 800 records; file position 6568
> Synced 900 records; file position 7388
> Synced 1000 records; file position 8209
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Decoded 1000 records
> Last record: {"text": "i = 1000"}
> {noformat}
> In the expected output, the detected sync markers should progress, and only 
> 100 records should be decoded:
> {noformat}
> Synced 100 records; file position 828
> Synced 200 records; file position 1648
> Synced 300 records; file position 2468
> Synced 400 records; file position 3288
> Synced 500 records; file position 4108
> Synced 600 records; file position 4928
> Synced 700 records; file position 5748
> Synced 800 records; file position 6568
> Synced 900 records; file position 7388
> Synced 1000 records; file position 8209
> Sync marker at 116
> Sync marker at 828
> Sync marker at 1648
> Sync marker at 2468
> Sync marker at 3288
> Sync marker at 4108
> Sync marker at 4928
> Sync marker at 5748
> Sync marker at 6568
> Sync marker at 7388
> Sync marker at 8209
> Decoded 100 records
> Last record: {"text": "i = 1000"}
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to