damondouglas commented on code in PR #29566:
URL: https://github.com/apache/beam/pull/29566#discussion_r1475338512


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java:
##########
@@ -35,59 +35,114 @@
  * <p>To obtain the collection of {@link ReadableFile} from a filepattern, use 
{@link
  * FileIO#readMatches()}.
  */
-public class ReadAllViaFileBasedSource<T> extends 
ReadAllViaFileBasedSourceTransform<T, T> {
-  public ReadAllViaFileBasedSource(
+public class ReadAllViaFileBasedSource<T, K> extends 
ReadAllViaFileBasedSourceTransform<T, K> {
+
+  private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
+  protected ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
-      Coder<T> coder) {
+      Coder<K> coder,
+      SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
     super(
         desiredBundleSizeBytes,
         createSource,
         coder,
         DEFAULT_USES_RESHUFFLE,
-        new ReadFileRangesFnExceptionHandler());
+        new 
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler());
+    this.outputFn = outputFn;
   }
 
-  public ReadAllViaFileBasedSource(
+  protected ReadAllViaFileBasedSource(

Review Comment:
   Would the reduction in visibility break others code who may already depend 
on this?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java:
##########
@@ -35,59 +35,114 @@
  * <p>To obtain the collection of {@link ReadableFile} from a filepattern, use 
{@link
  * FileIO#readMatches()}.
  */
-public class ReadAllViaFileBasedSource<T> extends 
ReadAllViaFileBasedSourceTransform<T, T> {
-  public ReadAllViaFileBasedSource(
+public class ReadAllViaFileBasedSource<T, K> extends 
ReadAllViaFileBasedSourceTransform<T, K> {
+
+  private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
+  protected ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
-      Coder<T> coder) {
+      Coder<K> coder,
+      SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
     super(
         desiredBundleSizeBytes,
         createSource,
         coder,
         DEFAULT_USES_RESHUFFLE,
-        new ReadFileRangesFnExceptionHandler());
+        new 
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler());
+    this.outputFn = outputFn;
   }
 
-  public ReadAllViaFileBasedSource(
+  protected ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
-      Coder<T> coder,
+      Coder<K> coder,
       boolean usesReshuffle,
-      ReadFileRangesFnExceptionHandler exceptionHandler) {
+      ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler 
exceptionHandler,
+      SerializableFunction<OutputContextFromFile<T>, K> outputFn) {
     super(desiredBundleSizeBytes, createSource, coder, usesReshuffle, 
exceptionHandler);
+    this.outputFn = outputFn;
+  }
+
+  public static <InputT> ReadAllViaFileBasedSource<InputT, InputT> create(
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, ? extends FileBasedSource<InputT>> 
createSource,
+      Coder<InputT> coder,
+      boolean usesReshuffle,
+      ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler 
exceptionHandler) {
+    return new ReadAllViaFileBasedSource<>(
+        desiredBundleSizeBytes,
+        createSource,
+        coder,
+        usesReshuffle,
+        exceptionHandler,
+        outputArguments -> outputArguments.reader().getCurrent());
+  }
+
+  public static <InputT> ReadAllViaFileBasedSource<InputT, InputT> create(
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, ? extends FileBasedSource<InputT>> 
createSource,
+      Coder<InputT> coder) {
+    return create(
+        desiredBundleSizeBytes,
+        createSource,
+        coder,
+        outputArguments -> outputArguments.reader().getCurrent());
+  }
+
+  public static <InputT, OutputT> ReadAllViaFileBasedSource<InputT, OutputT> 
create(
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, ? extends FileBasedSource<InputT>> 
createSource,
+      Coder<OutputT> coder,
+      SerializableFunction<OutputContextFromFile<InputT>, OutputT> outputFn) {
+    return new ReadAllViaFileBasedSource<>(desiredBundleSizeBytes, 
createSource, coder, outputFn);
   }
 
   @Override
-  protected DoFn<KV<ReadableFile, OffsetRange>, T> readRangesFn() {
-    return new ReadFileRangesFn<>(createSource, exceptionHandler);
+  protected DoFn<KV<ReadableFile, OffsetRange>, K> readRangesFn() {
+    return new ReadFileRangesFn<>(outputFn, createSource, exceptionHandler);
   }
 
-  private static class ReadFileRangesFn<T> extends AbstractReadFileRangesFn<T, 
T> {
+  private static class ReadFileRangesFn<T, K> extends 
AbstractReadFileRangesFn<T, K> {
+    private final SerializableFunction<OutputContextFromFile<T>, K> outputFn;
+
     public ReadFileRangesFn(
+        final SerializableFunction<OutputContextFromFile<T>, K> outputFn,
         final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource,
-        final ReadFileRangesFnExceptionHandler exceptionHandler) {
+        final 
ReadAllViaFileBasedSourceTransform.ReadFileRangesFnExceptionHandler

Review Comment:
   Would changing this signature of a public method break others code that 
already depends on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to