This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new f6e583578 AVRO-4006: Fix block finish while reading data files (#2969) 
(#2981)
f6e583578 is described below

commit f6e58357864d5686f031c26369591ebd83a9c7fe
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Jun 26 16:08:53 2024 +0200

    AVRO-4006: Fix block finish while reading data files (#2969) (#2981)
    
    Co-authored-by: Oscar Westra van Holthe - Kind 
<[email protected]>
---
 .../java/org/apache/avro/file/DataFileReader.java  | 15 ++---
 .../java/org/apache/avro/file/DataFileStream.java  |  5 +-
 .../test/java/org/apache/avro/TestDataFile.java    | 72 +++++++++++++++++-----
 3 files changed, 66 insertions(+), 26 deletions(-)

diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java 
b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
index 10067cd6c..8f333a1cb 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
@@ -17,18 +17,19 @@
  */
 package org.apache.avro.file;
 
+import org.apache.avro.InvalidAvroMagicException;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.compress.utils.IOUtils;
+
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.File;
 import java.util.Arrays;
 
-import org.apache.avro.InvalidAvroMagicException;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.avro.io.DatumReader;
-import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
 import static org.apache.avro.file.DataFileConstants.MAGIC;
+import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
 
 /**
  * Random access to files written with {@link DataFileWriter}.
@@ -170,7 +171,7 @@ public class DataFileReader<D> extends DataFileStream<D> 
implements FileReader<D
     vin = DecoderFactory.get().binaryDecoder(this.sin, vin);
     datumIn = null;
     blockRemaining = 0;
-    blockStart = position;
+    blockFinished();
   }
 
   /**
diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java 
b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
index a2b517225..a96b694ab 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
@@ -17,14 +17,14 @@
  */
 package org.apache.avro.file;
 
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -274,6 +274,7 @@ public class DataFileStream<D> implements Iterator<D>, 
Iterable<D>, Closeable {
     if (blockRemaining != blockCount)
       throw new IllegalStateException("Not at block start.");
     blockRemaining = 0;
+    blockFinished();
     datumIn = null;
     return blockBuffer;
   }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java 
b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index c11b8377d..98e92fe3f 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -17,21 +17,6 @@
  */
 package org.apache.avro;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
@@ -46,16 +31,29 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.util.RandomData;
-
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class TestDataFile {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDataFile.class);
 
@@ -89,6 +87,14 @@ public class TestDataFile {
   private static final String SCHEMA_JSON = "{\"type\": \"record\", \"name\": 
\"Test\", \"fields\": ["
       + "{\"name\":\"stringField\", \"type\":\"string\"}," + 
"{\"name\":\"longField\", \"type\":\"long\"}]}";
   private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);
+  private static final Object LAST_RECORD;
+  static {
+    Object lastValue = null;
+    for (Object object : new RandomData(SCHEMA, COUNT, SEED)) {
+      lastValue = object;
+    }
+    LAST_RECORD = lastValue;
+  }
 
   private File makeFile(CodecFactory codec) {
     return new File(DIR, "test-" + codec + ".avro");
@@ -109,6 +115,7 @@ public class TestDataFile {
       testGenericRead(codec);
       testSplits(codec);
       testSyncDiscovery(codec);
+      testReadLastRecord(codec);
       testGenericAppend(codec, encoder);
       testReadWithHeader(codec);
       testFSync(codec, encoder, false);
@@ -221,6 +228,37 @@ public class TestDataFile {
         reader.seek(sync);
         assertNotNull(reader.next());
       }
+      // Lastly, confirm that reading (but not decoding) all blocks results in 
the
+      // same sync points
+      reader.sync(0);
+      ArrayList<Long> syncs2 = new ArrayList<>();
+      while (reader.hasNext()) {
+        syncs2.add(reader.previousSync());
+        reader.nextBlock();
+      }
+      assertEquals(syncs, syncs2);
+    }
+  }
+
+  private void testReadLastRecord(CodecFactory codec) throws IOException {
+    File file = makeFile(codec);
+    try (DataFileReader<Object> reader = new DataFileReader<>(file, new 
GenericDatumReader<>())) {
+      long lastBlockStart = -1;
+      while (reader.hasNext()) {
+        // This algorithm can be made more efficient by checking if the 
underlying
+        // SeekableFileInput has been fully read: if so, the last block is in
+        // memory, and calls to next() will decode it.
+        // NOTE: this depends on the current implementation of DataFileReader.
+        lastBlockStart = reader.previousSync();
+        reader.nextBlock();
+      }
+      reader.seek(lastBlockStart);
+
+      Object lastRecord = null;
+      while (reader.hasNext()) {
+        lastRecord = reader.next(lastRecord);
+      }
+      assertEquals(LAST_RECORD, lastRecord);
     }
   }
 

Reply via email to