This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new e962bc47d AVRO-3983: Allow setting a custom encoder in DataFileWriter 
(#2874)
e962bc47d is described below

commit e962bc47d49758a583665846b122b1c3c73a9e2c
Author: Fokko Driesprong <[email protected]>
AuthorDate: Sun May 5 22:47:36 2024 +0200

    AVRO-3983: Allow setting a custom encoder in DataFileWriter (#2874)
---
 .../java/org/apache/avro/file/DataFileWriter.java  | 16 +++++++-
 .../test/java/org/apache/avro/TestDataFile.java    | 45 ++++++++++++++--------
 2 files changed, 45 insertions(+), 16 deletions(-)

diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java 
b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
index 65a305f34..6c2c45cf1 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
@@ -34,6 +34,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
@@ -73,6 +74,8 @@ public class DataFileWriter<D> implements Closeable, 
Flushable {
 
   private byte[] sync; // 16 random bytes
   private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL;
+  private Function<OutputStream, BinaryEncoder> initEncoder = out -> new 
EncoderFactory().directBinaryEncoder(out,
+      null);
 
   private boolean isOpen;
   private Codec codec;
@@ -130,6 +133,17 @@ public class DataFileWriter<D> implements Closeable, 
Flushable {
     return this;
   }
 
+  /**
+   * Allows setting a different encoder than the default DirectBinaryEncoder.
+   *
+   * @param initEncoderFunc Function to create a binary encoder
+   * @return this DataFileWriter
+   */
+  public DataFileWriter<D> setEncoder(Function<OutputStream, BinaryEncoder> 
initEncoderFunc) {
+    this.initEncoder = initEncoderFunc;
+    return this;
+  }
+
   /** Open a new file for data matching a schema with a random sync. */
   public DataFileWriter<D> create(Schema schema, File file) throws IOException 
{
     SyncableFileOutputStream sfos = new SyncableFileOutputStream(file);
@@ -242,7 +256,7 @@ public class DataFileWriter<D> implements Closeable, 
Flushable {
     this.vout = efactory.directBinaryEncoder(out, null);
     dout.setSchema(schema);
     buffer = new NonCopyingByteArrayOutputStream(Math.min((int) (syncInterval 
* 1.25), Integer.MAX_VALUE / 2 - 1));
-    this.bufOut = efactory.directBinaryEncoder(buffer, null);
+    this.bufOut = this.initEncoder.apply(buffer);
     if (this.codec == null) {
       this.codec = CodecFactory.nullCodec().createInstance();
     }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java 
b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index c82b9aae3..01611c698 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -25,9 +25,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import org.apache.avro.file.CodecFactory;
@@ -40,7 +42,9 @@ import org.apache.avro.file.Syncable;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.util.RandomData;
 
 import org.junit.jupiter.api.Test;
@@ -93,22 +97,32 @@ public class TestDataFile {
   @ParameterizedTest
   @MethodSource("codecs")
   public void runTestsInOrder(CodecFactory codec) throws Exception {
-    LOG.info("Running with codec: " + codec);
-    testGenericWrite(codec);
-    testGenericRead(codec);
-    testSplits(codec);
-    testSyncDiscovery(codec);
-    testGenericAppend(codec);
-    testReadWithHeader(codec);
-    testFSync(codec, false);
-    testFSync(codec, true);
+    // Run for both encoders, but the MethodSource didn't really like it,
+    // so it is just a loop within the test
+    List<Function<OutputStream, BinaryEncoder>> encoders = new ArrayList<>();
+    encoders.add(b -> new EncoderFactory().directBinaryEncoder(b, null));
+    encoders.add(b -> new EncoderFactory().blockingDirectBinaryEncoder(b, 
null));
+
+    for (Function<OutputStream, BinaryEncoder> encoder : encoders) {
+      LOG.info("Running with codec: {}", codec);
+      testGenericWrite(codec, encoder);
+      testGenericRead(codec);
+      testSplits(codec);
+      testSyncDiscovery(codec);
+      testGenericAppend(codec, encoder);
+      testReadWithHeader(codec);
+      testFSync(codec, encoder, false);
+      testFSync(codec, encoder, true);
+    }
   }
 
-  private void testGenericWrite(CodecFactory codec) throws IOException {
+  private void testGenericWrite(CodecFactory codec, Function<OutputStream, 
BinaryEncoder> encoderFunc)
+      throws IOException {
     DataFileWriter<Object> writer = new DataFileWriter<>(new 
GenericDatumWriter<>()).setSyncInterval(100);
     if (codec != null) {
       writer.setCodec(codec);
     }
+    writer.setEncoder(encoderFunc);
     writer.create(SCHEMA, makeFile(codec));
     try {
       int count = 0;
@@ -210,10 +224,12 @@ public class TestDataFile {
     }
   }
 
-  private void testGenericAppend(CodecFactory codec) throws IOException {
+  private void testGenericAppend(CodecFactory codec, Function<OutputStream, 
BinaryEncoder> encoderFunc)
+      throws IOException {
     File file = makeFile(codec);
     long start = file.length();
     try (DataFileWriter<Object> writer = new DataFileWriter<>(new 
GenericDatumWriter<>()).appendTo(file)) {
+      writer.setEncoder(encoderFunc);
       for (Object datum : new RandomData(SCHEMA, COUNT, SEED + 1)) {
         writer.append(datum);
       }
@@ -254,11 +270,8 @@ public class TestDataFile {
           assertEquals(validPos, sin.tell(), "Should not move from sync point 
on reopen");
           assertNotNull(readerFalse.next(), "Should be able to reopen at sync 
point");
         }
-
       }
-
     }
-
   }
 
   @Test
@@ -306,8 +319,10 @@ public class TestDataFile {
     assertTrue(out.flushCount < currentCount && out.flushCount >= 
flushCounter);
   }
 
-  private void testFSync(CodecFactory codec, boolean useFile) throws 
IOException {
+  private void testFSync(CodecFactory codec, Function<OutputStream, 
BinaryEncoder> encoderFunc, boolean useFile)
+      throws IOException {
     try (DataFileWriter<Object> writer = new DataFileWriter<>(new 
GenericDatumWriter<>())) {
+      writer.setEncoder(encoderFunc);
       writer.setFlushOnEveryBlock(false);
       TestingByteArrayOutputStream out = new TestingByteArrayOutputStream();
       if (useFile) {

Reply via email to