RustedBones commented on code in PR #27971:
URL: https://github.com/apache/beam/pull/27971#discussion_r1300339530
##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroSource.java:
##########
@@ -208,130 +218,109 @@ private void validate() {
}
}
- private Mode<GenericRecord> withSchema(String schema) {
- return new Mode<>(
- GenericRecord.class, schema, null, (Coder<GenericRecord>)
outputCoder, readerFactory);
+ private static Mode<GenericRecord, GenericRecord> empty() {
+ return new Mode<>(GenericRecord.class, null, null, null, null);
+ }
+
+ private <X> Mode<X, X> withSchema(Class<X> clazz, String schema) {
+ checkState(isEmpty(), "Schema can't be set on an already configured
source");
+ return new Mode<>(clazz, schema, null, null, null);
}
- private <X> Mode<X> withClass(Class<X> clazz) {
- return new Mode<>(
- clazz,
- ReflectData.get().getSchema(clazz).toString(),
- null,
- (Coder<X>) outputCoder,
- readerFactory);
+ private <X> Mode<X, X> withSchema(Class<X> clazz) {
+ checkState(isEmpty(), "Schema can't be set on an already configured
source");
+ return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(),
null, null, null);
}
- private <X> Mode<X> withParseFunction(
- SerializableFunction<GenericRecord, X> parseFn, Coder<X> outputCoder) {
- return new Mode<>(GenericRecord.class, null, parseFn, outputCoder,
readerFactory);
+ private <X> Mode<AvroT, X> withParseFunction(
+ SerializableFunction<AvroT, X> parseFn, Coder<X> outputCoder) {
+ return new Mode<>(type, readerSchemaString, parseFn, outputCoder,
readerFactory);
}
- private Mode<T> withCoder(Coder<T> coder) {
+ private Mode<AvroT, T> withCoder(Coder<T> coder) {
return new Mode<>(type, readerSchemaString, parseFn, coder,
readerFactory);
}
- private Mode<T> withReaderFactory(DatumReaderFactory<?> factory) {
+ private Mode<AvroT, T> withReaderFactory(DatumReaderFactory<AvroT>
factory) {
return new Mode<>(type, readerSchemaString, parseFn, outputCoder,
factory);
}
}
- private final Mode<T> mode;
+ private final Mode<AvroT, T> mode;
/**
* Reads from the given file name or pattern ("glob"). The returned source
needs to be further
* configured by calling {@link #withSchema} to return a type other than
{@link GenericRecord}.
*/
- public static AvroSource<GenericRecord> from(ValueProvider<String>
fileNameOrPattern) {
+ public static AvroSource<GenericRecord, GenericRecord> from(
+ ValueProvider<String> fileNameOrPattern) {
return new AvroSource<>(
- fileNameOrPattern, EmptyMatchTreatment.DISALLOW,
DEFAULT_MIN_BUNDLE_SIZE, new Mode<>());
+ fileNameOrPattern, EmptyMatchTreatment.DISALLOW,
DEFAULT_MIN_BUNDLE_SIZE, Mode.empty());
}
- public static AvroSource<GenericRecord> from(Metadata metadata) {
+ public static AvroSource<GenericRecord, GenericRecord> from(Metadata
metadata) {
return new AvroSource<>(
- metadata, DEFAULT_MIN_BUNDLE_SIZE, 0, metadata.sizeBytes(), new
Mode<>());
+ metadata, DEFAULT_MIN_BUNDLE_SIZE, 0, metadata.sizeBytes(),
Mode.empty());
}
/** Like {@link #from(ValueProvider)}. */
- public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
+ public static AvroSource<GenericRecord, GenericRecord> from(String
fileNameOrPattern) {
return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
}
- public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment
emptyMatchTreatment) {
+ public AvroSource<AvroT, T> withEmptyMatchTreatment(EmptyMatchTreatment
emptyMatchTreatment) {
+ if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+ // emptyMatchTreatment is unused for mode SINGLE_FILE_OR_SUBRANGE
+ return this;
+ }
return new AvroSource<>(
getFileOrPatternSpecProvider(), emptyMatchTreatment,
getMinBundleSize(), mode);
Review Comment:
new source was created, regardless of the mode, which may extract wrong
parameters in case of `SINGLE_FILE_OR_SUBRANGE`. Looking at parent
[constructor](https://github.com/apache/beam/blob/4e67a59f051afca68653048a217e2f874d31833a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java#L133),
field is unused in that case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]