ahmedabu98 commented on code in PR #29670:
URL: https://github.com/apache/beam/pull/29670#discussion_r1434063961


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -236,6 +238,28 @@
  * destination-dependent: every window/pane for every destination will use the 
same number of shards
  * specified via {@link Write#withNumShards} or {@link Write#withSharding}.
  *
+ * <h3>Handling Errors</h3>
+ *
+ * <p>When using dynamic destinations, or when using a formatting function to 
format a record for
+ * writing, it's possible for an individual record to be malformed, causing an 
exception. By
+ * default, these exceptions are propagated to the runner, and are usually 
retried, though this
+ * depends on the runner. Alternately, these errors can be routed to another 
{@link PTransform} by
+ * using {@link Write#withBadRecordErrorHandler(ErrorHandler, 
SerializableFunction)}. The

Review Comment:
   The method definition below takes only one argument?
   `withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler)` (line 
1321)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -236,6 +238,28 @@
  * destination-dependent: every window/pane for every destination will use the 
same number of shards
  * specified via {@link Write#withNumShards} or {@link Write#withSharding}.
  *
+ * <h3>Handling Errors</h3>
+ *
+ * <p>When using dynamic destinations, or when using a formatting function to 
format a record for
+ * writing, it's possible for an individual record to be malformed, causing an 
exception. By
+ * default, these exceptions are propagated to the runner, and are usually 
retried, though this
+ * depends on the runner. Alternately, these errors can be routed to another 
{@link PTransform} by
+ * using {@link Write#withBadRecordErrorHandler(ErrorHandler, 
SerializableFunction)}. The
+ * ErrorHandler is registered with the pipeline (see below), and the 
SerializableFunction lets you
+ * filter which exceptions should be sent to the error handler, and which 
should be handled by the
+ * runner. See {@link ErrorHandler} for more documentation. Of note, this 
error handling only
+ * handles errors related to specific records. It does not handle errors 
related to connectivity,
+ * authorization, etc. as those should be retried by the runner.
+ *
+ * <pre>{@code
+ * PCollection<> records = ...;
+ * PTransform<PCollection<BadRecord>,?> alternateSink = ...;
+ * try (BadRecordErrorHandler<?> handler = 
pipeline.registerBadRecordErrorHandler(alternateSink) {
+ *    records.apply("Write", FileIO.writeDynamic().otherConfigs()
+ *        .withBadRecordErrorHandler(handler, (exception) -> true));

Review Comment:
   Similar to above, looks like this should have only one argument 



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java:
##########
@@ -176,6 +178,10 @@
  *
  * <p>For backwards compatibility, {@link TextIO} also supports the legacy 
{@link
  * DynamicDestinations} interface for advanced features via {@link 
Write#to(DynamicDestinations)}.
+ *
+ * <p>Error handling for records that are malformed can be handled by using 
{@link
+ * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. 
See documentation in

Review Comment:
   `withBadRecordErrorHandler` should take one argument?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to