ahmedabu98 commented on code in PR #29670: URL: https://github.com/apache/beam/pull/29670#discussion_r1434063961
########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java: ########## @@ -236,6 +238,28 @@ * destination-dependent: every window/pane for every destination will use the same number of shards * specified via {@link Write#withNumShards} or {@link Write#withSharding}. * + * <h3>Handling Errors</h3> + * + * <p>When using dynamic destinations, or when using a formatting function to format a record for + * writing, it's possible for an individual record to be malformed, causing an exception. By + * default, these exceptions are propagated to the runner, and are usually retried, though this + * depends on the runner. Alternately, these errors can be routed to another {@link PTransform} by + * using {@link Write#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. The Review Comment: The method definition below takes only one argument? `withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler)` (line 1321) ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java: ########## @@ -236,6 +238,28 @@ * destination-dependent: every window/pane for every destination will use the same number of shards * specified via {@link Write#withNumShards} or {@link Write#withSharding}. * + * <h3>Handling Errors</h3> + * + * <p>When using dynamic destinations, or when using a formatting function to format a record for + * writing, it's possible for an individual record to be malformed, causing an exception. By + * default, these exceptions are propagated to the runner, and are usually retried, though this + * depends on the runner. Alternately, these errors can be routed to another {@link PTransform} by + * using {@link Write#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. The + * ErrorHandler is registered with the pipeline (see below), and the SerializableFunction lets you + * filter which exceptions should be sent to the error handler, and which should be handled by the + * runner. See {@link ErrorHandler} for more documentation. Of note, this error handling only + * handles errors related to specific records. It does not handle errors related to connectivity, + * authorization, etc. as those should be retried by the runner. + * + * <pre>{@code + * PCollection<> records = ...; + * PTransform<PCollection<BadRecord>,?> alternateSink = ...; + * try (BadRecordErrorHandler<?> handler = pipeline.registerBadRecordErrorHandler(alternateSink) { + * records.apply("Write", FileIO.writeDynamic().otherConfigs() + * .withBadRecordErrorHandler(handler, (exception) -> true)); Review Comment: Similar to above, looks like this should have only one argument ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java: ########## @@ -176,6 +178,10 @@ * * <p>For backwards compatibility, {@link TextIO} also supports the legacy {@link * DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}. + * + * <p>Error handling for records that are malformed can be handled by using {@link + * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. See documentation in Review Comment: `withBadRecordErrorHandler` should take one argument? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org