This is an automated email from the ASF dual-hosted git repository.
mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 325efced65d Return new ReflectData for both reflect datum reader and
writer (#28280)
325efced65d is described below
commit 325efced65dfd1a4f654e835f2830390e4dcf186
Author: Michel Davit <[email protected]>
AuthorDate: Fri Sep 1 12:37:27 2023 +0200
Return new ReflectData for both reflect datum reader and writer (#28280)
---
.../sdk/extensions/avro/io/AvroDatumFactory.java | 25 +++++++++-------------
1 file changed, 10 insertions(+), 15 deletions(-)
diff --git
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java
index 55c6e266e27..67125a6ad24 100644
---
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java
+++
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
@@ -33,6 +34,9 @@ import
org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Create {@link DatumReader} and {@link DatumWriter} for given schemas. */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
public abstract class AvroDatumFactory<T>
implements AvroSource.DatumReaderFactory<T>,
AvroSink.DatumWriterFactory<T> {
@@ -168,25 +172,16 @@ public abstract class AvroDatumFactory<T>
@Override
public DatumReader<T> apply(Schema writer, Schema reader) {
- // create the datum writer using the Class<T> api.
- // avro will load the proper class loader
- ReflectDatumReader<T> datumReader = new ReflectDatumReader<>(type);
- datumReader.setExpected(reader);
- datumReader.setSchema(writer);
- // for backward compat, add logical type support by default
- AvroUtils.addLogicalTypeConversions(datumReader.getData());
- return datumReader;
+ ReflectData data = new ReflectData(type.getClassLoader());
+ AvroUtils.addLogicalTypeConversions(data);
+ return new ReflectDatumReader<>(writer, reader, data);
}
@Override
public DatumWriter<T> apply(Schema writer) {
- // create the datum writer using the Class<T> api.
- // avro will load the proper class loader
- ReflectDatumWriter<T> datumWriter = new ReflectDatumWriter<>(type);
- datumWriter.setSchema(writer);
- // for backward compat, add logical type support by default
- AvroUtils.addLogicalTypeConversions(datumWriter.getData());
- return datumWriter;
+ ReflectData data = new ReflectData(type.getClassLoader());
+ AvroUtils.addLogicalTypeConversions(data);
+ return new ReflectDatumWriter<>(writer, data);
}
public static <T> ReflectDatumFactory<T> of(Class<T> type) {