kennknowles commented on a change in pull request #16271:
URL: https://github.com/apache/beam/pull/16271#discussion_r781527428



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
##########
@@ -182,7 +173,25 @@ public static AvroGenericCoder of(Schema schema) {
    * @param <T> the element type
    */
   public static <T> AvroCoder<T> of(Class<T> type, Schema schema, boolean 
useReflectApi) {
-    return new AvroCoder<>(type, schema, useReflectApi);
+    AvroSource.DatumReaderFactory<T> readerFactory =
+        new AvroSource.DefaultDatumReaderFactory<>(type, useReflectApi);
+    AvroSink.DatumWriterFactory<T> writerFactory =
+        new AvroSink.DefaultDatumWriterFactory<>(type, useReflectApi);
+    return new AvroCoder<>(type, schema, readerFactory, writerFactory);
+  }
+
+  /**
+   * Returns an {@code AvroCoder} instance for the given class and schema, 
using the provided datum
+   * factories.
+   *
+   * @param <T> the element type
+   */
+  public static <T> AvroCoder<T> of(
+      Class<T> type,
+      Schema schema,
+      AvroSource.DatumReaderFactory<T> readerFactory,

Review comment:
       A little thing: I think `AvroSource` and `AvroSink` can depend on 
`AvroCoder` but it should not necessarily go the other way around. Since this 
is just a type alias functional interface, you could just have an interface in 
this file, or an interface independent of both of them.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
##########
@@ -542,18 +550,21 @@
    * <p>If the output type is {@link GenericRecord} use {@link 
#writeCustomTypeToGenericRecords()}
    * instead.
    */
-  public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> 
writeCustomType() {
-    return AvroIO.<UserT, 
OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> 
writeCustomType(
+      Class<OutputT> recordClass) {

Review comment:
       Can you make it into two codepaths? I am just coming to this PR to help 
out a little so I don't have all the history of your thoughts on this.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
##########
@@ -38,20 +42,52 @@
 })
 public class AvroSink<UserT, DestinationT, OutputT>
     extends FileBasedSink<UserT, DestinationT, OutputT> {
-  private final boolean genericRecords;
+  private final Class<OutputT> type;

Review comment:
       nit: a class is slightly different from a type (types have generics and 
constraints, etc, etc) so prefer some identifier like `elementClass` or `clazz`

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
##########
@@ -1338,7 +1383,7 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       abstract Builder<UserT, DestinationT, OutputT> setShardTemplate(
           @Nullable String shardTemplate);
 
-      abstract Builder<UserT, DestinationT, OutputT> setGenericRecords(boolean 
genericRecords);
+      abstract Builder<UserT, DestinationT, OutputT> 
setRecordClass(Class<OutputT> recordClass);

Review comment:
       Can you make this also a separate code path?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
##########
@@ -301,56 +292,42 @@ public ReflectData get() {
   private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
   private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
 
-  // Lazily re-instantiated after deserialization
-  private final Supplier<ReflectData> reflectData;
-
-  protected AvroCoder(Class<T> type, Schema schema) {
-    this(type, schema, false);
-  }
-
-  protected AvroCoder(Class<T> type, Schema schema, boolean useReflectApi) {
+  protected AvroCoder(
+      Class<T> type,
+      Schema schema,
+      AvroSource.DatumReaderFactory<T> readerFactory,
+      AvroSink.DatumWriterFactory<T> writerFactory) {
     this.type = type;
-    this.useReflectApi = useReflectApi;
+    this.useReflectApi = true; // TODO this is wrong

Review comment:
       Just noting the TODO being unresolved




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to