RustedBones commented on code in PR #27971:
URL: https://github.com/apache/beam/pull/27971#discussion_r1300339530


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroSource.java:
##########
@@ -208,130 +218,109 @@ private void validate() {
       }
     }
 
-    private Mode<GenericRecord> withSchema(String schema) {
-      return new Mode<>(
-          GenericRecord.class, schema, null, (Coder<GenericRecord>) 
outputCoder, readerFactory);
+    private static Mode<GenericRecord, GenericRecord> empty() {
+      return new Mode<>(GenericRecord.class, null, null, null, null);
+    }
+
+    private <X> Mode<X, X> withSchema(Class<X> clazz, String schema) {
+      checkState(isEmpty(), "Schema can't be set on an already configured 
source");
+      return new Mode<>(clazz, schema, null, null, null);
     }
 
-    private <X> Mode<X> withClass(Class<X> clazz) {
-      return new Mode<>(
-          clazz,
-          ReflectData.get().getSchema(clazz).toString(),
-          null,
-          (Coder<X>) outputCoder,
-          readerFactory);
+    private <X> Mode<X, X> withSchema(Class<X> clazz) {
+      checkState(isEmpty(), "Schema can't be set on an already configured 
source");
+      return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), 
null, null, null);
     }
 
-    private <X> Mode<X> withParseFunction(
-        SerializableFunction<GenericRecord, X> parseFn, Coder<X> outputCoder) {
-      return new Mode<>(GenericRecord.class, null, parseFn, outputCoder, 
readerFactory);
+    private <X> Mode<AvroT, X> withParseFunction(
+        SerializableFunction<AvroT, X> parseFn, Coder<X> outputCoder) {
+      return new Mode<>(type, readerSchemaString, parseFn, outputCoder, 
readerFactory);
     }
 
-    private Mode<T> withCoder(Coder<T> coder) {
+    private Mode<AvroT, T> withCoder(Coder<T> coder) {
       return new Mode<>(type, readerSchemaString, parseFn, coder, 
readerFactory);
     }
 
-    private Mode<T> withReaderFactory(DatumReaderFactory<?> factory) {
+    private Mode<AvroT, T> withReaderFactory(DatumReaderFactory<AvroT> 
factory) {
       return new Mode<>(type, readerSchemaString, parseFn, outputCoder, 
factory);
     }
   }
 
-  private final Mode<T> mode;
+  private final Mode<AvroT, T> mode;
 
   /**
    * Reads from the given file name or pattern ("glob"). The returned source 
needs to be further
    * configured by calling {@link #withSchema} to return a type other than 
{@link GenericRecord}.
    */
-  public static AvroSource<GenericRecord> from(ValueProvider<String> 
fileNameOrPattern) {
+  public static AvroSource<GenericRecord, GenericRecord> from(
+      ValueProvider<String> fileNameOrPattern) {
     return new AvroSource<>(
-        fileNameOrPattern, EmptyMatchTreatment.DISALLOW, 
DEFAULT_MIN_BUNDLE_SIZE, new Mode<>());
+        fileNameOrPattern, EmptyMatchTreatment.DISALLOW, 
DEFAULT_MIN_BUNDLE_SIZE, Mode.empty());
   }
 
-  public static AvroSource<GenericRecord> from(Metadata metadata) {
+  public static AvroSource<GenericRecord, GenericRecord> from(Metadata 
metadata) {
     return new AvroSource<>(
-        metadata, DEFAULT_MIN_BUNDLE_SIZE, 0, metadata.sizeBytes(), new 
Mode<>());
+        metadata, DEFAULT_MIN_BUNDLE_SIZE, 0, metadata.sizeBytes(), 
Mode.empty());
   }
 
   /** Like {@link #from(ValueProvider)}. */
-  public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
+  public static AvroSource<GenericRecord, GenericRecord> from(String 
fileNameOrPattern) {
     return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
   }
 
-  public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment 
emptyMatchTreatment) {
+  public AvroSource<AvroT, T> withEmptyMatchTreatment(EmptyMatchTreatment 
emptyMatchTreatment) {
+    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+      // emptyMatchTreatment is unused for mode SINGLE_FILE_OR_SUBRANGE
+      return this;
+    }
     return new AvroSource<>(
         getFileOrPatternSpecProvider(), emptyMatchTreatment, 
getMinBundleSize(), mode);

Review Comment:
   new source was created, regardless of the mode, which may extract wrong 
parameters in case of `SINGLE_FILE_OR_SUBRANGE`. Looking at parent 
[constructor](https://github.com/apache/beam/blob/4e67a59f051afca68653048a217e2f874d31833a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java#L133),
 field is unused in that case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to