This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new f6e583578 AVRO-4006: Fix block finish while reading data files (#2969)
(#2981)
f6e583578 is described below
commit f6e58357864d5686f031c26369591ebd83a9c7fe
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Jun 26 16:08:53 2024 +0200
AVRO-4006: Fix block finish while reading data files (#2969) (#2981)
Co-authored-by: Oscar Westra van Holthe - Kind
<[email protected]>
---
.../java/org/apache/avro/file/DataFileReader.java | 15 ++---
.../java/org/apache/avro/file/DataFileStream.java | 5 +-
.../test/java/org/apache/avro/TestDataFile.java | 72 +++++++++++++++++-----
3 files changed, 66 insertions(+), 26 deletions(-)
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
index 10067cd6c..8f333a1cb 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java
@@ -17,18 +17,19 @@
*/
package org.apache.avro.file;
+import org.apache.avro.InvalidAvroMagicException;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.compress.utils.IOUtils;
+
import java.io.EOFException;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.io.File;
import java.util.Arrays;
-import org.apache.avro.InvalidAvroMagicException;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.avro.io.DatumReader;
-import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
import static org.apache.avro.file.DataFileConstants.MAGIC;
+import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
/**
* Random access to files written with {@link DataFileWriter}.
@@ -170,7 +171,7 @@ public class DataFileReader<D> extends DataFileStream<D>
implements FileReader<D
vin = DecoderFactory.get().binaryDecoder(this.sin, vin);
datumIn = null;
blockRemaining = 0;
- blockStart = position;
+ blockFinished();
}
/**
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
index a2b517225..a96b694ab 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
@@ -17,14 +17,14 @@
*/
package org.apache.avro.file;
+import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.Closeable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -274,6 +274,7 @@ public class DataFileStream<D> implements Iterator<D>,
Iterable<D>, Closeable {
if (blockRemaining != blockCount)
throw new IllegalStateException("Not at block start.");
blockRemaining = 0;
+ blockFinished();
datumIn = null;
return blockBuffer;
}
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index c11b8377d..98e92fe3f 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -17,21 +17,6 @@
*/
package org.apache.avro;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
@@ -46,16 +31,29 @@ import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.RandomData;
-
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class TestDataFile {
private static final Logger LOG =
LoggerFactory.getLogger(TestDataFile.class);
@@ -89,6 +87,14 @@ public class TestDataFile {
private static final String SCHEMA_JSON = "{\"type\": \"record\", \"name\":
\"Test\", \"fields\": ["
+ "{\"name\":\"stringField\", \"type\":\"string\"}," +
"{\"name\":\"longField\", \"type\":\"long\"}]}";
private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);
+ private static final Object LAST_RECORD;
+ static {
+ Object lastValue = null;
+ for (Object object : new RandomData(SCHEMA, COUNT, SEED)) {
+ lastValue = object;
+ }
+ LAST_RECORD = lastValue;
+ }
private File makeFile(CodecFactory codec) {
return new File(DIR, "test-" + codec + ".avro");
@@ -109,6 +115,7 @@ public class TestDataFile {
testGenericRead(codec);
testSplits(codec);
testSyncDiscovery(codec);
+ testReadLastRecord(codec);
testGenericAppend(codec, encoder);
testReadWithHeader(codec);
testFSync(codec, encoder, false);
@@ -221,6 +228,37 @@ public class TestDataFile {
reader.seek(sync);
assertNotNull(reader.next());
}
+ // Lastly, confirm that reading (but not decoding) all blocks results in
the
+ // same sync points
+ reader.sync(0);
+ ArrayList<Long> syncs2 = new ArrayList<>();
+ while (reader.hasNext()) {
+ syncs2.add(reader.previousSync());
+ reader.nextBlock();
+ }
+ assertEquals(syncs, syncs2);
+ }
+ }
+
+ private void testReadLastRecord(CodecFactory codec) throws IOException {
+ File file = makeFile(codec);
+ try (DataFileReader<Object> reader = new DataFileReader<>(file, new
GenericDatumReader<>())) {
+ long lastBlockStart = -1;
+ while (reader.hasNext()) {
+ // This algorithm can be made more efficient by checking if the
underlying
+ // SeekableFileInput has been fully read: if so, the last block is in
+ // memory, and calls to next() will decode it.
+ // NOTE: this depends on the current implementation of DataFileReader.
+ lastBlockStart = reader.previousSync();
+ reader.nextBlock();
+ }
+ reader.seek(lastBlockStart);
+
+ Object lastRecord = null;
+ while (reader.hasNext()) {
+ lastRecord = reader.next(lastRecord);
+ }
+ assertEquals(LAST_RECORD, lastRecord);
}
}