[ https://issues.apache.org/jira/browse/BEAM-8100?focusedWorklogId=316840&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316840 ]
ASF GitHub Bot logged work on BEAM-8100: ---------------------------------------- Author: ASF GitHub Bot Created on: 23/Sep/19 17:19 Start Date: 23/Sep/19 17:19 Worklog Time Spent: 10m Work Description: jklukas commented on pull request #9499: [BEAM-8100] Exception handling for AsJsons and ParseJsons URL: https://github.com/apache/beam/pull/9499#discussion_r327231280 ########## File path: sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java ########## @@ -56,6 +70,60 @@ private AsJsons(Class<? extends InputT> outputClass) { return newTransform; } + /** + * Returns a new {@link AsJsonsWithFailures} transform that catches exceptions raised while + * writing JSON elements, with the given type descriptor used for the failure collection but the + * exception handler yet to be specified using {@link + * AsJsonsWithFailures#exceptionsVia(ProcessFunction)}. + * + * <p>See {@link WithFailures} documentation for usage patterns of the returned {@link + * WithFailures.Result}. + */ + @Experimental(Experimental.Kind.WITH_EXCEPTIONS) + public <NewFailureT> AsJsonsWithFailures<NewFailureT> exceptionsInto( + TypeDescriptor<NewFailureT> failureTypeDescriptor) { + return new AsJsonsWithFailures<>(null, failureTypeDescriptor); + } + + /** + * Returns a new {@link AsJsonsWithFailures} transform that catches exceptions raised while + * writing JSON elements, passing the raised exception instance and the input element being + * processed through the given {@code exceptionHandler} and emitting the result to a failure + * collection. + * + * <p>See {@link WithFailures} documentation for usage patterns of the returned {@link + * WithFailures.Result}. + * + * <p>Example usage: + * + * <pre>{@code + * WithFailures.Result<PCollection<String>, KV<MyPojo, Map<String, String>>> result = + * pojos.apply( + * AsJsons.of(MyPojo.class) + * .exceptionsVia(new WithFailures.ExceptionAsMapHandler<MyPojo>() {})); + * + * PCollection<String> output = result.output(); // valid json elements + * PCollection<KV<MyPojo, Map<String, String>>> failures = result.failures(); + * }</pre> + */ + @Experimental(Experimental.Kind.WITH_EXCEPTIONS) + public <FailureT> AsJsonsWithFailures<FailureT> exceptionsVia( + InferableFunction<WithFailures.ExceptionElement<InputT>, FailureT> exceptionHandler) { + return new AsJsonsWithFailures<>(exceptionHandler, exceptionHandler.getOutputTypeDescriptor()); + } + + @Experimental(Experimental.Kind.WITH_EXCEPTIONS) Review comment: Need Javadoc with example usage here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 316840) Time Spent: 3h (was: 2h 50m) > Add exception handling to Json transforms in Java SDK > ----------------------------------------------------- > > Key: BEAM-8100 > URL: https://issues.apache.org/jira/browse/BEAM-8100 > Project: Beam > Issue Type: Improvement > Components: extensions-java-json, sdk-java-core > Reporter: Jeff Klukas > Assignee: Alexey Romanenko > Priority: Minor > Time Spent: 3h > Remaining Estimate: 0h > > Follow-up to https://issues.apache.org/jira/browse/BEAM-5638. We have > exception handling for MapElements and FlatMapElements. It should be > straightforward to add parallel signatures to AsJsons and ParseJsons for > catching parsing errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)