damondouglas commented on code in PR #29566:
URL: https://github.com/apache/beam/pull/29566#discussion_r1475338512
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java:
##########
@@ -35,59 +35,114 @@
* <p>To obtain the collection of {@link ReadableFile} from a filepattern, use
{@link
* FileIO#readMatches()}.
*/
-public class ReadAllViaFileBasedSource<T> extends
ReadAllViaFileBasedSourceTransform<T, T> {
- public ReadAllViaFileBasedSource(
+public class ReadAllViaFileBasedSource<T, K> extends
ReadAllViaFileBasedSourceTransform<T, K> {
+
+ private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
+ protected ReadAllViaFileBasedSource(
long desiredBundleSizeBytes,
SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
- Coder<T> coder) {
+ Coder<K> coder,
+ SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
super(
desiredBundleSizeBytes,
createSource,
coder,
DEFAULT_USES_RESHUFFLE,
- new ReadFileRangesFnExceptionHandler());
+ new
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler());
+ this.outputFn = outputFn;
}
- public ReadAllViaFileBasedSource(
+ protected ReadAllViaFileBasedSource(
Review Comment:
Would the reduction in visibility break others code who may already depend
on this?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java:
##########
@@ -35,59 +35,114 @@
* <p>To obtain the collection of {@link ReadableFile} from a filepattern, use
{@link
* FileIO#readMatches()}.
*/
-public class ReadAllViaFileBasedSource<T> extends
ReadAllViaFileBasedSourceTransform<T, T> {
- public ReadAllViaFileBasedSource(
+public class ReadAllViaFileBasedSource<T, K> extends
ReadAllViaFileBasedSourceTransform<T, K> {
+
+ private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
+ protected ReadAllViaFileBasedSource(
long desiredBundleSizeBytes,
SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
- Coder<T> coder) {
+ Coder<K> coder,
+ SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
super(
desiredBundleSizeBytes,
createSource,
coder,
DEFAULT_USES_RESHUFFLE,
- new ReadFileRangesFnExceptionHandler());
+ new
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler());
+ this.outputFn = outputFn;
}
- public ReadAllViaFileBasedSource(
+ protected ReadAllViaFileBasedSource(
long desiredBundleSizeBytes,
SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
- Coder<T> coder,
+ Coder<K> coder,
boolean usesReshuffle,
- ReadFileRangesFnExceptionHandler exceptionHandler) {
+ ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler
exceptionHandler,
+ SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
super(desiredBundleSizeBytes, createSource, coder, usesReshuffle,
exceptionHandler);
+ this.outputFn = outputFn;
+ }
+
+ public static <InputT> ReadAllViaFileBasedSource<InputT, InputT> create(
+ long desiredBundleSizeBytes,
+ SerializableFunction<String, ? extends FileBasedSource<InputT>>
createSource,
+ Coder<InputT> coder,
+ boolean usesReshuffle,
+ ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler
exceptionHandler) {
+ return new ReadAllViaFileBasedSource<>(
+ desiredBundleSizeBytes,
+ createSource,
+ coder,
+ usesReshuffle,
+ exceptionHandler,
+ outputArguments -> outputArguments.reader().getCurrent());
+ }
+
+ public static <InputT> ReadAllViaFileBasedSource<InputT, InputT> create(
+ long desiredBundleSizeBytes,
+ SerializableFunction<String, ? extends FileBasedSource<InputT>>
createSource,
+ Coder<InputT> coder) {
+ return create(
+ desiredBundleSizeBytes,
+ createSource,
+ coder,
+ outputArguments -> outputArguments.reader().getCurrent());
+ }
+
+ public static <InputT, OutputT> ReadAllViaFileBasedSource<InputT, OutputT>
create(
+ long desiredBundleSizeBytes,
+ SerializableFunction<String, ? extends FileBasedSource<InputT>>
createSource,
+ Coder<OutputT> coder,
+ SerializableFunction<OutputContextFromFile<InputT>, OutputT> outputFn) {
+ return new ReadAllViaFileBasedSource<>(desiredBundleSizeBytes,
createSource, coder, outputFn);
}
@Override
- protected DoFn<KV<ReadableFile, OffsetRange>, T> readRangesFn() {
- return new ReadFileRangesFn<>(createSource, exceptionHandler);
+ protected DoFn<KV<ReadableFile, OffsetRange>, K> readRangesFn() {
+ return new ReadFileRangesFn<>(outputFn, createSource, exceptionHandler);
}
- private static class ReadFileRangesFn<T> extends AbstractReadFileRangesFn<T,
T> {
+ private static class ReadFileRangesFn<T, K> extends
AbstractReadFileRangesFn<T, K> {
+ private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
public ReadFileRangesFn(
+ final SerializableFunction<OutputContextFromFile<T>, K> outputFn,
final SerializableFunction<String, ? extends FileBasedSource<T>>
createSource,
- final ReadFileRangesFnExceptionHandler exceptionHandler) {
+ final
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler
Review Comment:
Would changing this signature of a public method break others code that
already depends on this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]