This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/main by this push:
new e962bc47d AVRO-3983: Allow setting a custom encoder in DataFileWriter
(#2874)
e962bc47d is described below
commit e962bc47d49758a583665846b122b1c3c73a9e2c
Author: Fokko Driesprong <[email protected]>
AuthorDate: Sun May 5 22:47:36 2024 +0200
AVRO-3983: Allow setting a custom encoder in DataFileWriter (#2874)
---
.../java/org/apache/avro/file/DataFileWriter.java | 16 +++++++-
.../test/java/org/apache/avro/TestDataFile.java | 45 ++++++++++++++--------
2 files changed, 45 insertions(+), 16 deletions(-)
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
index 65a305f34..6c2c45cf1 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
@@ -34,6 +34,7 @@ import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
@@ -73,6 +74,8 @@ public class DataFileWriter<D> implements Closeable,
Flushable {
private byte[] sync; // 16 random bytes
private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL;
+ private Function<OutputStream, BinaryEncoder> initEncoder = out -> new
EncoderFactory().directBinaryEncoder(out,
+ null);
private boolean isOpen;
private Codec codec;
@@ -130,6 +133,17 @@ public class DataFileWriter<D> implements Closeable,
Flushable {
return this;
}
+ /**
+ * Allows setting a different encoder than the default DirectBinaryEncoder.
+ *
+ * @param initEncoderFunc Function to create a binary encoder
+ * @return this DataFileWriter
+ */
+ public DataFileWriter<D> setEncoder(Function<OutputStream, BinaryEncoder>
initEncoderFunc) {
+ this.initEncoder = initEncoderFunc;
+ return this;
+ }
+
/** Open a new file for data matching a schema with a random sync. */
public DataFileWriter<D> create(Schema schema, File file) throws IOException
{
SyncableFileOutputStream sfos = new SyncableFileOutputStream(file);
@@ -242,7 +256,7 @@ public class DataFileWriter<D> implements Closeable,
Flushable {
this.vout = efactory.directBinaryEncoder(out, null);
dout.setSchema(schema);
buffer = new NonCopyingByteArrayOutputStream(Math.min((int) (syncInterval
* 1.25), Integer.MAX_VALUE / 2 - 1));
- this.bufOut = efactory.directBinaryEncoder(buffer, null);
+ this.bufOut = this.initEncoder.apply(buffer);
if (this.codec == null) {
this.codec = CodecFactory.nullCodec().createInstance();
}
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index c82b9aae3..01611c698 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -25,9 +25,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.avro.file.CodecFactory;
@@ -40,7 +42,9 @@ import org.apache.avro.file.Syncable;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.RandomData;
import org.junit.jupiter.api.Test;
@@ -93,22 +97,32 @@ public class TestDataFile {
@ParameterizedTest
@MethodSource("codecs")
public void runTestsInOrder(CodecFactory codec) throws Exception {
- LOG.info("Running with codec: " + codec);
- testGenericWrite(codec);
- testGenericRead(codec);
- testSplits(codec);
- testSyncDiscovery(codec);
- testGenericAppend(codec);
- testReadWithHeader(codec);
- testFSync(codec, false);
- testFSync(codec, true);
+ // Run for both encoders, but the MethodSource didn't really like it,
+ // so it is just a loop within the test
+ List<Function<OutputStream, BinaryEncoder>> encoders = new ArrayList<>();
+ encoders.add(b -> new EncoderFactory().directBinaryEncoder(b, null));
+ encoders.add(b -> new EncoderFactory().blockingDirectBinaryEncoder(b,
null));
+
+ for (Function<OutputStream, BinaryEncoder> encoder : encoders) {
+ LOG.info("Running with codec: {}", codec);
+ testGenericWrite(codec, encoder);
+ testGenericRead(codec);
+ testSplits(codec);
+ testSyncDiscovery(codec);
+ testGenericAppend(codec, encoder);
+ testReadWithHeader(codec);
+ testFSync(codec, encoder, false);
+ testFSync(codec, encoder, true);
+ }
}
- private void testGenericWrite(CodecFactory codec) throws IOException {
+ private void testGenericWrite(CodecFactory codec, Function<OutputStream,
BinaryEncoder> encoderFunc)
+ throws IOException {
DataFileWriter<Object> writer = new DataFileWriter<>(new
GenericDatumWriter<>()).setSyncInterval(100);
if (codec != null) {
writer.setCodec(codec);
}
+ writer.setEncoder(encoderFunc);
writer.create(SCHEMA, makeFile(codec));
try {
int count = 0;
@@ -210,10 +224,12 @@ public class TestDataFile {
}
}
- private void testGenericAppend(CodecFactory codec) throws IOException {
+ private void testGenericAppend(CodecFactory codec, Function<OutputStream,
BinaryEncoder> encoderFunc)
+ throws IOException {
File file = makeFile(codec);
long start = file.length();
try (DataFileWriter<Object> writer = new DataFileWriter<>(new
GenericDatumWriter<>()).appendTo(file)) {
+ writer.setEncoder(encoderFunc);
for (Object datum : new RandomData(SCHEMA, COUNT, SEED + 1)) {
writer.append(datum);
}
@@ -254,11 +270,8 @@ public class TestDataFile {
assertEquals(validPos, sin.tell(), "Should not move from sync point
on reopen");
assertNotNull(readerFalse.next(), "Should be able to reopen at sync
point");
}
-
}
-
}
-
}
@Test
@@ -306,8 +319,10 @@ public class TestDataFile {
assertTrue(out.flushCount < currentCount && out.flushCount >=
flushCounter);
}
- private void testFSync(CodecFactory codec, boolean useFile) throws
IOException {
+ private void testFSync(CodecFactory codec, Function<OutputStream,
BinaryEncoder> encoderFunc, boolean useFile)
+ throws IOException {
try (DataFileWriter<Object> writer = new DataFileWriter<>(new
GenericDatumWriter<>())) {
+ writer.setEncoder(encoderFunc);
writer.setFlushOnEveryBlock(false);
TestingByteArrayOutputStream out = new TestingByteArrayOutputStream();
if (useFile) {