ahmedabu98 commented on code in PR #29670: URL: https://github.com/apache/beam/pull/29670#discussion_r1434050026
########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java: ########## @@ -236,6 +238,28 @@ * destination-dependent: every window/pane for every destination will use the same number of shards * specified via {@link Write#withNumShards} or {@link Write#withSharding}. * + * <h3>Handling Errors</h3> + * + * <p>When using dynamic destinations, or when using a formatting function to format a record for + * writing, it's possible for an individual record to be malformed, causing an exception. By + * default, these exceptions are propagated to the runner, and are usually retried, though this Review Comment: `these exceptions are propagated to the runner` -> this means the bundle fails right? May be good to clearly mention that ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java: ########## @@ -1288,6 +1317,12 @@ public Write<DestinationT, UserT> withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } + /** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */ Review Comment: Can we have the documentation live here instead of WriteFiles? I think users will interact with FileIO more as the top-level transform. ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java: ########## @@ -176,6 +178,10 @@ * * <p>For backwards compatibility, {@link TextIO} also supports the legacy {@link * DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}. + * + * <p>Error handling for records that are malformed can be handled by using {@link + * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. See documentation in Review Comment: `withBadRecordErrorHandler` should take one argument? ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java: ########## @@ -701,6 +775,56 @@ private static <DestinationT> int hashDestination( .asInt(); } + private static class MaybeDestination<DestinationT> { + final DestinationT destination; + final boolean isValid; + + MaybeDestination(DestinationT destination, boolean isValid) { + this.destination = destination; + this.isValid = isValid; + } + } + // Utility method to get the dynamic destination based on a record. Returns a MaybeDestination + // because some implementations of dynamic destinations return null, despite this being prohibited + // by the interface + private MaybeDestination<DestinationT> getDestinationWithErrorHandling( + UserT input, MultiOutputReceiver outputReceiver, Coder<UserT> inputCoder) throws Exception { + try { + return new MaybeDestination<>(getDynamicDestinations().getDestination(input), true); + } catch (Exception e) { + getBadRecordRouter() + .route( + outputReceiver, input, inputCoder, e, "Unable to get dynamic destination for record"); + return new MaybeDestination<>(null, false); + } + } + + // Utility method to format a record based on the dynamic destination. If the operation fails, and + // is output to the bad record router, this returns null + private @Nullable OutputT formatRecordWithErrorHandling( + UserT input, MultiOutputReceiver outputReceiver, Coder<UserT> inputCoder) throws Exception { + try { + return getDynamicDestinations().formatRecord(input); + } catch (Exception e) { + getBadRecordRouter() + .route( + outputReceiver, + input, + inputCoder, + e, + "Unable to format record for Dynamic Destination"); + return null; + } + } + + private void addErrorCollection(PCollectionTuple sourceTuple) { + getBadRecordErrorHandler() + .addErrorCollection( + sourceTuple + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(sourceTuple.getPipeline()))); Review Comment: This also looks like something that can be a generic utility method ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java: ########## @@ -176,6 +178,10 @@ * * <p>For backwards compatibility, {@link TextIO} also supports the legacy {@link * DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}. + * + * <p>Error handling for records that are malformed can be handled by using {@link + * TypedWrite#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. See documentation in + * {@link FileIO} for details on usage Review Comment: We point to `FileIO` for details on usage and examples, but for the `withBadRecordErrorHandler` method definition below we point to `WriteFiles` (same pattern in AvroIO) I think it makes sense to accumulate any error handling documentation in one place to make it easier for users to find and devs to update in the future. ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java: ########## @@ -236,6 +238,28 @@ * destination-dependent: every window/pane for every destination will use the same number of shards * specified via {@link Write#withNumShards} or {@link Write#withSharding}. * + * <h3>Handling Errors</h3> + * + * <p>When using dynamic destinations, or when using a formatting function to format a record for + * writing, it's possible for an individual record to be malformed, causing an exception. By + * default, these exceptions are propagated to the runner, and are usually retried, though this + * depends on the runner. Alternately, these errors can be routed to another {@link PTransform} by + * using {@link Write#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. The Review Comment: The method definition below takes only one argument? `withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler)` (line 1321) ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java: ########## @@ -236,6 +238,28 @@ * destination-dependent: every window/pane for every destination will use the same number of shards * specified via {@link Write#withNumShards} or {@link Write#withSharding}. * + * <h3>Handling Errors</h3> + * + * <p>When using dynamic destinations, or when using a formatting function to format a record for + * writing, it's possible for an individual record to be malformed, causing an exception. By + * default, these exceptions are propagated to the runner, and are usually retried, though this + * depends on the runner. Alternately, these errors can be routed to another {@link PTransform} by + * using {@link Write#withBadRecordErrorHandler(ErrorHandler, SerializableFunction)}. The + * ErrorHandler is registered with the pipeline (see below), and the SerializableFunction lets you + * filter which exceptions should be sent to the error handler, and which should be handled by the + * runner. See {@link ErrorHandler} for more documentation. Of note, this error handling only + * handles errors related to specific records. It does not handle errors related to connectivity, + * authorization, etc. as those should be retried by the runner. + * + * <pre>{@code + * PCollection<> records = ...; + * PTransform<PCollection<BadRecord>,?> alternateSink = ...; + * try (BadRecordErrorHandler<?> handler = pipeline.registerBadRecordErrorHandler(alternateSink) { + * records.apply("Write", FileIO.writeDynamic().otherConfigs() + * .withBadRecordErrorHandler(handler, (exception) -> true)); Review Comment: Similar to above, looks like this should have only one argument ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java: ########## @@ -495,28 +528,43 @@ private WriteUnshardedBundlesToTempFiles( @Override public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) { - if (getMaxNumWritersPerBundle() < 0) { - return input - .apply( - "WritedUnshardedBundles", - ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder)) - .withSideInputs(getSideInputs())) - .setCoder(fileResultCoder); - } TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords"); TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag = new TupleTag<>("unwrittenRecords"); + Coder<UserT> inputCoder = input.getCoder(); + if (getMaxNumWritersPerBundle() < 0) { + PCollectionTuple writeTuple = + input.apply( + "WritedUnshardedBundles", + ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder, inputCoder)) Review Comment: Should we be passing `unwrittenRecordsTag` to `WriteUnshardedTempFilesFn` here? it's referenced later in lines 543, 546 ########## sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java: ########## @@ -701,6 +775,56 @@ private static <DestinationT> int hashDestination( .asInt(); } + private static class MaybeDestination<DestinationT> { + final DestinationT destination; + final boolean isValid; + + MaybeDestination(DestinationT destination, boolean isValid) { + this.destination = destination; + this.isValid = isValid; + } + } + // Utility method to get the dynamic destination based on a record. Returns a MaybeDestination + // because some implementations of dynamic destinations return null, despite this being prohibited + // by the interface + private MaybeDestination<DestinationT> getDestinationWithErrorHandling( + UserT input, MultiOutputReceiver outputReceiver, Coder<UserT> inputCoder) throws Exception { + try { + return new MaybeDestination<>(getDynamicDestinations().getDestination(input), true); + } catch (Exception e) { + getBadRecordRouter() + .route( + outputReceiver, input, inputCoder, e, "Unable to get dynamic destination for record"); + return new MaybeDestination<>(null, false); + } + } + + // Utility method to format a record based on the dynamic destination. If the operation fails, and + // is output to the bad record router, this returns null + private @Nullable OutputT formatRecordWithErrorHandling( Review Comment: `formatRecordWithErrorHandling` and `getDestinationWithErrorHandling` have a similar function. They perform an operation on an element. If successful, the output of the operation is returned. Otherwise it gets routed with a custom error message and a default is returned instead. If it makes sense, this can be implemented as a generic utility method that can be used across IOs (prob out of scope of this PR tho) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org