This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new 2490231cf AVRO-4006: Fix block finish while reading data files (#2969)
2490231cf is described below

commit 2490231cf5352cad1df02682c99a4cd11242b98c
Author: Oscar Westra van Holthe - Kind <[email protected]>
AuthorDate: Tue Jun 25 17:05:32 2024 +0200

    AVRO-4006: Fix block finish while reading data files (#2969)
---
 .../java/org/apache/avro/file/DataFileReader.java  | 15 ++---
 .../java/org/apache/avro/file/DataFileStream.java  | 23 +++----
 .../test/java/org/apache/avro/TestDataFile.java    | 72 +++++++++++++++++-----
 3 files changed, 75 insertions(+), 35 deletions(-)

diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java 
b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
index 10067cd6c..8f333a1cb 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
@@ -17,18 +17,19 @@
  */
 package org.apache.avro.file;
 
+import org.apache.avro.InvalidAvroMagicException;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.compress.utils.IOUtils;
+
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.File;
 import java.util.Arrays;
 
-import org.apache.avro.InvalidAvroMagicException;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.avro.io.DatumReader;
-import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
 import static org.apache.avro.file.DataFileConstants.MAGIC;
+import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
 
 /**
  * Random access to files written with {@link DataFileWriter}.
@@ -170,7 +171,7 @@ public class DataFileReader<D> extends DataFileStream<D> 
implements FileReader<D
     vin = DecoderFactory.get().binaryDecoder(this.sin, vin);
     datumIn = null;
     blockRemaining = 0;
-    blockStart = position;
+    blockFinished();
   }
 
   /**
diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java 
b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
index e9b5ed388..aa4586846 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
@@ -17,14 +17,23 @@
  */
 package org.apache.avro.file;
 
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.InvalidAvroMagicException;
+import org.apache.avro.NameValidator;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,15 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.InvalidAvroMagicException;
-import org.apache.avro.NameValidator;
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-
 /**
  * Streaming access to files written by {@link DataFileWriter}. Use
  * {@link DataFileReader} for file-based input.
@@ -275,6 +275,7 @@ public class DataFileStream<D> implements Iterator<D>, 
Iterable<D>, Closeable {
     if (blockRemaining != blockCount)
       throw new IllegalStateException("Not at block start.");
     blockRemaining = 0;
+    blockFinished();
     datumIn = null;
     return blockBuffer;
   }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java 
b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index 01611c698..e411ab0ef 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -17,21 +17,6 @@
  */
 package org.apache.avro;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
@@ -46,16 +31,29 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.util.RandomData;
-
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class TestDataFile {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDataFile.class);
 
@@ -89,6 +87,14 @@ public class TestDataFile {
   private static final String SCHEMA_JSON = "{\"type\": \"record\", \"name\": 
\"Test\", \"fields\": ["
       + "{\"name\":\"stringField\", \"type\":\"string\"}," + 
"{\"name\":\"longField\", \"type\":\"long\"}]}";
   private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);
+  private static final Object LAST_RECORD;
+  static {
+    Object lastValue = null;
+    for (Object object : new RandomData(SCHEMA, COUNT, SEED)) {
+      lastValue = object;
+    }
+    LAST_RECORD = lastValue;
+  }
 
   private File makeFile(CodecFactory codec) {
     return new File(DIR, "test-" + codec + ".avro");
@@ -109,6 +115,7 @@ public class TestDataFile {
       testGenericRead(codec);
       testSplits(codec);
       testSyncDiscovery(codec);
+      testReadLastRecord(codec);
       testGenericAppend(codec, encoder);
       testReadWithHeader(codec);
       testFSync(codec, encoder, false);
@@ -221,6 +228,37 @@ public class TestDataFile {
         reader.seek(sync);
         assertNotNull(reader.next());
       }
+      // Lastly, confirm that reading (but not decoding) all blocks results in 
the
+      // same sync points
+      reader.sync(0);
+      ArrayList<Long> syncs2 = new ArrayList<>();
+      while (reader.hasNext()) {
+        syncs2.add(reader.previousSync());
+        reader.nextBlock();
+      }
+      assertEquals(syncs, syncs2);
+    }
+  }
+
+  private void testReadLastRecord(CodecFactory codec) throws IOException {
+    File file = makeFile(codec);
+    try (DataFileReader<Object> reader = new DataFileReader<>(file, new 
GenericDatumReader<>())) {
+      long lastBlockStart = -1;
+      while (reader.hasNext()) {
+        // This algorithm can be made more efficient by checking if the 
underlying
+        // SeekableFileInput has been fully read: if so, the last block is in
+        // memory, and calls to next() will decode it.
+        // NOTE: this depends on the current implementation of DataFileReader.
+        lastBlockStart = reader.previousSync();
+        reader.nextBlock();
+      }
+      reader.seek(lastBlockStart);
+
+      Object lastRecord = null;
+      while (reader.hasNext()) {
+        lastRecord = reader.next(lastRecord);
+      }
+      assertEquals(LAST_RECORD, lastRecord);
     }
   }
 

Reply via email to