This is an automated email from the ASF dual-hosted git repository.
nielsbasjes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 101048de5 AVRO-3684: [Java] testAppendStream test
101048de5 is described below
commit 101048de5bb213aed4cff1b262205979662580bd
Author: Mikko Kortelainen <[email protected]>
AuthorDate: Sat Feb 18 17:46:10 2023 +0200
AVRO-3684: [Java] testAppendStream test
---
.../test/java/org/apache/avro/TestDataFile.java | 64 ++++++++++++++++++++++
1 file changed, 64 insertions(+)
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index ed83afa0b..dce33514a 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -36,11 +37,15 @@ import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.SyncableFileOutputStream;
import org.apache.avro.file.Syncable;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.RandomData;
import org.junit.jupiter.api.Test;
@@ -102,6 +107,7 @@ public class TestDataFile {
testReadWithHeader(codec);
testFSync(codec, false);
testFSync(codec, true);
+ testAppendStream(codec);
}
private void testGenericWrite(CodecFactory codec) throws IOException {
@@ -333,6 +339,64 @@ public class TestDataFile {
}
}
+ private void testAppendStream(CodecFactory codec) throws IOException {
+ File file = makeFile(codec);
+
+ DatumWriter<Object> datumWriter = new SpecificDatumWriter<>();
+
+ // write COUNT objects to datafile
+ try (DataFileWriter<Object> writer = new DataFileWriter<>(datumWriter)) {
+ try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
+
+ SyncableFileOutputStream fileOutputStream = new
SyncableFileOutputStream(raf.getFD());
+
+ writer.create(SCHEMA, fileOutputStream);
+ for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
+ writer.append(datum);
+ writer.flush();
+ }
+ }
+ }
+
+ // append to existing file
+ try (DataFileWriter<Object> writer = new DataFileWriter<>(datumWriter)) {
+ try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
+ try (RandomAccessFile rif = new RandomAccessFile(file, "r")) {
+
+ SyncableFileOutputStream fileOutputStream = new
SyncableFileOutputStream(raf.getFD());
+
+ // seek to end
+
fileOutputStream.getChannel().position(fileOutputStream.getChannel().size());
+
+ // append to existing fileStream
+ try (SeekableFileInput in = new SeekableFileInput(rif.getFD())) {
+ writer.appendTo(in, fileOutputStream);
+ }
+
+ for (Object datum : new RandomData(SCHEMA, COUNT, SEED + 1)) {
+ writer.append(datum);
+ writer.flush();
+ }
+ }
+ }
+ }
+
+ // verify objects in file
+ long recordCounter = 0;
+
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
+ SeekableFileInput in = new SeekableFileInput(raf.getFD());
+ DatumReader<Object> datumReader = new SpecificDatumReader<>();
+ try (FileReader<? extends Object> reader = DataFileReader.openReader(in,
datumReader)) {
+ while (reader.hasNext()) {
+ reader.next();
+ recordCounter++;
+ }
+ }
+ }
+ assertEquals(COUNT * 2, recordCounter);
+ }
+
static void readFile(File f, DatumReader<? extends Object> datumReader)
throws IOException {
try (FileReader<? extends Object> reader = DataFileReader.openReader(f,
datumReader)) {
for (Object datum : reader) {