This is an automated email from the ASF dual-hosted git repository.

nielsbasjes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 101048de5 AVRO-3684: [Java] testAppendStream test
101048de5 is described below

commit 101048de5bb213aed4cff1b262205979662580bd
Author: Mikko Kortelainen <[email protected]>
AuthorDate: Sat Feb 18 17:46:10 2023 +0200

    AVRO-3684: [Java] testAppendStream test
---
 .../test/java/org/apache/avro/TestDataFile.java    | 64 ++++++++++++++++++++++
 1 file changed, 64 insertions(+)

diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java 
b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index ed83afa0b..dce33514a 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -36,11 +37,15 @@ import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.SyncableFileOutputStream;
 import org.apache.avro.file.Syncable;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.RandomData;
 
 import org.junit.jupiter.api.Test;
@@ -102,6 +107,7 @@ public class TestDataFile {
     testReadWithHeader(codec);
     testFSync(codec, false);
     testFSync(codec, true);
+    testAppendStream(codec);
   }
 
   private void testGenericWrite(CodecFactory codec) throws IOException {
@@ -333,6 +339,64 @@ public class TestDataFile {
     }
   }
 
+  private void testAppendStream(CodecFactory codec) throws IOException {
+    File file = makeFile(codec);
+
+    DatumWriter<Object> datumWriter = new SpecificDatumWriter<>();
+
+    // write COUNT objects to datafile
+    try (DataFileWriter<Object> writer = new DataFileWriter<>(datumWriter)) {
+      try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
+
+        SyncableFileOutputStream fileOutputStream = new 
SyncableFileOutputStream(raf.getFD());
+
+        writer.create(SCHEMA, fileOutputStream);
+        for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
+          writer.append(datum);
+          writer.flush();
+        }
+      }
+    }
+
+    // append to existing file
+    try (DataFileWriter<Object> writer = new DataFileWriter<>(datumWriter)) {
+      try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
+        try (RandomAccessFile rif = new RandomAccessFile(file, "r")) {
+
+          SyncableFileOutputStream fileOutputStream = new 
SyncableFileOutputStream(raf.getFD());
+
+          // seek to end
+          
fileOutputStream.getChannel().position(fileOutputStream.getChannel().size());
+
+          // append to existing fileStream
+          try (SeekableFileInput in = new SeekableFileInput(rif.getFD())) {
+            writer.appendTo(in, fileOutputStream);
+          }
+
+          for (Object datum : new RandomData(SCHEMA, COUNT, SEED + 1)) {
+            writer.append(datum);
+            writer.flush();
+          }
+        }
+      }
+    }
+
+    // verify objects in file
+    long recordCounter = 0;
+
+    try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
+      SeekableFileInput in = new SeekableFileInput(raf.getFD());
+      DatumReader<Object> datumReader = new SpecificDatumReader<>();
+      try (FileReader<? extends Object> reader = DataFileReader.openReader(in, 
datumReader)) {
+        while (reader.hasNext()) {
+          reader.next();
+          recordCounter++;
+        }
+      }
+    }
+    assertEquals(COUNT * 2, recordCounter);
+  }
+
   static void readFile(File f, DatumReader<? extends Object> datumReader) 
throws IOException {
     try (FileReader<? extends Object> reader = DataFileReader.openReader(f, 
datumReader)) {
       for (Object datum : reader) {

Reply via email to