ahmedabu98 commented on code in PR #29670:
URL: https://github.com/apache/beam/pull/29670#discussion_r1434050026


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -236,6 +238,28 @@
  * destination-dependent: every window/pane for every destination will use the 
same number of shards
  * specified via {@link Write#withNumShards} or {@link Write#withSharding}.
  *
+ * <h3>Handling Errors</h3>
+ *
+ * <p>When using dynamic destinations, or when using a formatting function to 
format a record for
+ * writing, it's possible for an individual record to be malformed, causing an 
exception. By
+ * default, these exceptions are propagated to the runner, and are usually 
retried, though this

Review Comment:
   `these exceptions are propagated to the runner` -> this means the bundle 
fails right? May be good to clearly mention that



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -1288,6 +1317,12 @@ public Write<DestinationT, UserT> withNoSpilling() {
       return toBuilder().setNoSpilling(true).build();
     }
 
+    /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */

Review Comment:
   Can we have the documentation live here instead of WriteFiles? I think users 
will interact with FileIO more as the top-level transform. 



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java:
##########
@@ -176,6 +178,10 @@
  *
  * <p>For backwards compatibility, {@link TextIO} also supports the legacy 
{@link
  * DynamicDestinations} interface for advanced features via {@link 
Write#to(DynamicDestinations)}.
+ *
+ * <p>Error handling for records that are malformed can be handled by using 
{@link
+ * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. 
See documentation in

Review Comment:
   `withBadRecordErrorHandler` should take one argument?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -701,6 +775,56 @@ private static <DestinationT> int hashDestination(
         .asInt();
   }
 
+  private static class MaybeDestination<DestinationT> {
+    final DestinationT destination;
+    final boolean isValid;
+
+    MaybeDestination(DestinationT destination, boolean isValid) {
+      this.destination = destination;
+      this.isValid = isValid;
+    }
+  }
+  // Utility method to get the dynamic destination based on a record. Returns 
a MaybeDestination
+  // because some implementations of dynamic destinations return null, despite 
this being prohibited
+  // by the interface
+  private MaybeDestination<DestinationT> getDestinationWithErrorHandling(
+      UserT input, MultiOutputReceiver outputReceiver, Coder<UserT> 
inputCoder) throws Exception {
+    try {
+      return new 
MaybeDestination<>(getDynamicDestinations().getDestination(input), true);
+    } catch (Exception e) {
+      getBadRecordRouter()
+          .route(
+              outputReceiver, input, inputCoder, e, "Unable to get dynamic 
destination for record");
+      return new MaybeDestination<>(null, false);
+    }
+  }
+
+  // Utility method to format a record based on the dynamic destination. If 
the operation fails, and
+  // is output to the bad record router, this returns null
+  private @Nullable OutputT formatRecordWithErrorHandling(
+      UserT input, MultiOutputReceiver outputReceiver, Coder<UserT> 
inputCoder) throws Exception {
+    try {
+      return getDynamicDestinations().formatRecord(input);
+    } catch (Exception e) {
+      getBadRecordRouter()
+          .route(
+              outputReceiver,
+              input,
+              inputCoder,
+              e,
+              "Unable to format record for Dynamic Destination");
+      return null;
+    }
+  }
+
+  private void addErrorCollection(PCollectionTuple sourceTuple) {
+    getBadRecordErrorHandler()
+        .addErrorCollection(
+            sourceTuple
+                .get(BAD_RECORD_TAG)
+                .setCoder(BadRecord.getCoder(sourceTuple.getPipeline())));

Review Comment:
   This also looks like something that can be a generic utility method



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java:
##########
@@ -176,6 +178,10 @@
  *
  * <p>For backwards compatibility, {@link TextIO} also supports the legacy 
{@link
  * DynamicDestinations} interface for advanced features via {@link 
Write#to(DynamicDestinations)}.
+ *
+ * <p>Error handling for records that are malformed can be handled by using 
{@link
+ * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. 
See documentation in
+ * {@link FileIO} for details on usage

Review Comment:
   We point to `FileIO` for details on usage and examples, but for the 
`withBadRecordErrorHandler` method definition below we point to `WriteFiles` 
(same pattern in AvroIO)
   
   I think it makes sense to accumulate any error handling documentation in one 
place to make it easier for users to find and devs to update in the future.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -236,6 +238,28 @@
  * destination-dependent: every window/pane for every destination will use the 
same number of shards
  * specified via {@link Write#withNumShards} or {@link Write#withSharding}.
  *
+ * <h3>Handling Errors</h3>
+ *
+ * <p>When using dynamic destinations, or when using a formatting function to 
format a record for
+ * writing, it's possible for an individual record to be malformed, causing an 
exception. By
+ * default, these exceptions are propagated to the runner, and are usually 
retried, though this
+ * depends on the runner. Alternately, these errors can be routed to another 
{@link PTransform} by
+ * using {@link Write#withBadRecordErrorHandler(ErrorHandler, 
SerializableFunction)}. The

Review Comment:
   The method definition below takes only one argument?
   `withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler)` (line 
1321)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -236,6 +238,28 @@
  * destination-dependent: every window/pane for every destination will use the 
same number of shards
  * specified via {@link Write#withNumShards} or {@link Write#withSharding}.
  *
+ * <h3>Handling Errors</h3>
+ *
+ * <p>When using dynamic destinations, or when using a formatting function to 
format a record for
+ * writing, it's possible for an individual record to be malformed, causing an 
exception. By
+ * default, these exceptions are propagated to the runner, and are usually 
retried, though this
+ * depends on the runner. Alternately, these errors can be routed to another 
{@link PTransform} by
+ * using {@link Write#withBadRecordErrorHandler(ErrorHandler, 
SerializableFunction)}. The
+ * ErrorHandler is registered with the pipeline (see below), and the 
SerializableFunction lets you
+ * filter which exceptions should be sent to the error handler, and which 
should be handled by the
+ * runner. See {@link ErrorHandler} for more documentation. Of note, this 
error handling only
+ * handles errors related to specific records. It does not handle errors 
related to connectivity,
+ * authorization, etc. as those should be retried by the runner.
+ *
+ * <pre>{@code
+ * PCollection<> records = ...;
+ * PTransform<PCollection<BadRecord>,?> alternateSink = ...;
+ * try (BadRecordErrorHandler<?> handler = 
pipeline.registerBadRecordErrorHandler(alternateSink) {
+ *    records.apply("Write", FileIO.writeDynamic().otherConfigs()
+ *        .withBadRecordErrorHandler(handler, (exception) -> true));

Review Comment:
   Similar to above, looks like this should have only one argument 



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -495,28 +528,43 @@ private WriteUnshardedBundlesToTempFiles(
 
     @Override
     public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> 
input) {
-      if (getMaxNumWritersPerBundle() < 0) {
-        return input
-            .apply(
-                "WritedUnshardedBundles",
-                ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder))
-                    .withSideInputs(getSideInputs()))
-            .setCoder(fileResultCoder);
-      }
       TupleTag<FileResult<DestinationT>> writtenRecordsTag = new 
TupleTag<>("writtenRecords");
       TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag =
           new TupleTag<>("unwrittenRecords");
+      Coder<UserT> inputCoder = input.getCoder();
+      if (getMaxNumWritersPerBundle() < 0) {
+        PCollectionTuple writeTuple =
+            input.apply(
+                "WritedUnshardedBundles",
+                ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder, 
inputCoder))

Review Comment:
   Should we be passing `unwrittenRecordsTag` to `WriteUnshardedTempFilesFn` 
here? it's referenced later in lines 543, 546



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -701,6 +775,56 @@ private static <DestinationT> int hashDestination(
         .asInt();
   }
 
+  private static class MaybeDestination<DestinationT> {
+    final DestinationT destination;
+    final boolean isValid;
+
+    MaybeDestination(DestinationT destination, boolean isValid) {
+      this.destination = destination;
+      this.isValid = isValid;
+    }
+  }
+  // Utility method to get the dynamic destination based on a record. Returns 
a MaybeDestination
+  // because some implementations of dynamic destinations return null, despite 
this being prohibited
+  // by the interface
+  private MaybeDestination<DestinationT> getDestinationWithErrorHandling(
+      UserT input, MultiOutputReceiver outputReceiver, Coder<UserT> 
inputCoder) throws Exception {
+    try {
+      return new 
MaybeDestination<>(getDynamicDestinations().getDestination(input), true);
+    } catch (Exception e) {
+      getBadRecordRouter()
+          .route(
+              outputReceiver, input, inputCoder, e, "Unable to get dynamic 
destination for record");
+      return new MaybeDestination<>(null, false);
+    }
+  }
+
+  // Utility method to format a record based on the dynamic destination. If 
the operation fails, and
+  // is output to the bad record router, this returns null
+  private @Nullable OutputT formatRecordWithErrorHandling(

Review Comment:
   `formatRecordWithErrorHandling` and `getDestinationWithErrorHandling` have a 
similar function. They perform an operation on an element. If successful, the 
output of the operation is returned. Otherwise it gets routed with a custom 
error message and a default is returned instead. 
   
   If it makes sense, this can be implemented as a generic utility method that 
can be used across IOs (prob out of scope of this PR tho)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to