[
https://issues.apache.org/jira/browse/AVRO-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Oscar Westra van Holthe - Kind resolved AVRO-4006.
--------------------------------------------------
Fix Version/s: 1.12.0
Resolution: Fixed
Fixed in 1.12.0. Not in 1.11.4 as cherry-picking yields a merge conflict.
> [Java] DataFileReader does not correctly identify last sync marker when
> reading/skipping blocks
> -----------------------------------------------------------------------------------------------
>
> Key: AVRO-4006
> URL: https://issues.apache.org/jira/browse/AVRO-4006
> Project: Apache Avro
> Issue Type: Bug
> Components: java
> Affects Versions: 1.11.3
> Reporter: Oscar Westra van Holthe - Kind
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.12.0
>
> Time Spent: 50m
> Remaining Estimate: 0h
>
> The following code demonstrates the problem:
> {code:java}
> import org.apache.avro.Schema;
> import org.apache.avro.SchemaBuilder;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.DataFileWriter;
> import org.apache.avro.file.SeekableFileInput;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericDatumWriter;
> import org.apache.avro.generic.IndexedRecord;
> import org.apache.avro.io.DatumReader;
> import java.io.File;
> import java.io.IOException;
> public class AvroTest {
> public static void main(String[] args) throws IOException {
> File avroFile = new File("test.avro");
> GenericData model = GenericData.get();
> Schema simple =
> SchemaBuilder.record("TestRecord").fields().requiredString("text").endRecord();
> Schema.Field textField = simple.getField("text");
> try (DataFileWriter<Object> writer = new DataFileWriter<>(new
> GenericDatumWriter<>(null, model)).create(simple, avroFile)) {
> for (int i = 1; i <= 1000; i++) {
> Object record = model.newRecord(null, simple);
> model.setField(record, textField.name(), textField.pos(), "i
> = " + i);
> writer.append(record);
> if (i % 100 == 0) {
> long syncPos = writer.sync();
> System.out.printf("Synced %d records; file position
> %d%n", i, syncPos);
> }
> }
> }
> IndexedRecord result;
> DatumReader<IndexedRecord> datumReader = new
> GenericDatumReader<>(simple, simple, model);
> try (SeekableFileInput sfi = new SeekableFileInput(avroFile);
> MyDataFileReader<IndexedRecord> reader = new
> MyDataFileReader<>(sfi, datumReader)) {
> // Find the start of the last block reading the entire file,
> WITHOUT decoding any records.
> // Note that this does decompress the data, but that's so fast
> these days that it hardly affects reading speed.
> long lastSyncPos = reader.previousSync();
> while (reader.hasNext()) {
> lastSyncPos = reader.previousSync();
> System.out.printf("Sync marker at %d%n", lastSyncPos);
> // Mark the block as read, so hasNext() will read the next
> block
> reader.nextBlock();
> }
> System.out.printf("Sync marker at %d%n", reader.previousSync());
> reader.seek(lastSyncPos);
> IndexedRecord lastRecord1 = null;
> int decoded = 0;
> while (reader.hasNext()) {
> lastRecord1 = reader.next(lastRecord1);
> decoded++;
> }
> System.out.printf("Decoded %d records%n", decoded);
> result = lastRecord1;
> }
> Object lastRecord = result;
> System.out.printf("Last record: %s%n", lastRecord);
> }
> private static class MyDataFileReader<T> extends DataFileReader<T> {
> public MyDataFileReader(SeekableFileInput sfi, DatumReader<T>
> datumReader) throws IOException {
> super(sfi, datumReader);
> }
> @Override
> public void blockFinished() throws IOException {
> super.blockFinished();
> }
> }
> }
> {code}
> The output:
> {noformat}
> Synced 100 records; file position 828
> Synced 200 records; file position 1648
> Synced 300 records; file position 2468
> Synced 400 records; file position 3288
> Synced 500 records; file position 4108
> Synced 600 records; file position 4928
> Synced 700 records; file position 5748
> Synced 800 records; file position 6568
> Synced 900 records; file position 7388
> Synced 1000 records; file position 8209
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Sync marker at 116
> Decoded 1000 records
> Last record: {"text": "i = 1000"}
> {noformat}
> In the expected output, the detected sync markers should progress, and only
> 100 records should be decoded:
> {noformat}
> Synced 100 records; file position 828
> Synced 200 records; file position 1648
> Synced 300 records; file position 2468
> Synced 400 records; file position 3288
> Synced 500 records; file position 4108
> Synced 600 records; file position 4928
> Synced 700 records; file position 5748
> Synced 800 records; file position 6568
> Synced 900 records; file position 7388
> Synced 1000 records; file position 8209
> Sync marker at 116
> Sync marker at 828
> Sync marker at 1648
> Sync marker at 2468
> Sync marker at 3288
> Sync marker at 4108
> Sync marker at 4928
> Sync marker at 5748
> Sync marker at 6568
> Sync marker at 7388
> Sync marker at 8209
> Decoded 100 records
> Last record: {"text": "i = 1000"}
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)