[ 
https://issues.apache.org/jira/browse/BEAM-8100?focusedWorklogId=316842&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316842
 ]

ASF GitHub Bot logged work on BEAM-8100:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Sep/19 17:19
            Start Date: 23/Sep/19 17:19
    Worklog Time Spent: 10m 
      Work Description: jklukas commented on pull request #9499: [BEAM-8100] 
Exception handling for AsJsons and ParseJsons
URL: https://github.com/apache/beam/pull/9499#discussion_r327232550
 
 

 ##########
 File path: 
sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java
 ##########
 @@ -64,13 +132,75 @@ private AsJsons(Class<? extends InputT> outputClass) {
               @Override
               public String apply(InputT input) {
                 try {
-                  ObjectMapper mapper = 
Optional.fromNullable(customMapper).or(DEFAULT_MAPPER);
-                  return mapper.writeValueAsString(input);
+                  return writeValue(input);
                 } catch (IOException e) {
                   throw new RuntimeException(
                       "Failed to serialize " + inputClass.getName() + " value: 
" + input, e);
                 }
               }
             }));
   }
+
+  /** A {@code PTransform} that adds exception handling to {@link AsJsons}. */
+  public class AsJsonsWithFailures<FailureT>
+      extends PTransform<PCollection<InputT>, 
WithFailures.Result<PCollection<String>, FailureT>> {
+
+    @Nullable
+    private InferableFunction<WithFailures.ExceptionElement<InputT>, FailureT> 
exceptionHandler;
+
+    @Nullable private final transient TypeDescriptor<FailureT> failureType;
+
+    AsJsonsWithFailures(
+        InferableFunction<WithFailures.ExceptionElement<InputT>, FailureT> 
exceptionHandler,
+        TypeDescriptor<FailureT> failureType) {
+      this.exceptionHandler = exceptionHandler;
+      this.failureType = failureType;
+    }
+
+    public AsJsonsWithFailures<FailureT> exceptionsVia(
+        ProcessFunction<WithFailures.ExceptionElement<InputT>, FailureT> 
exceptionHandler) {
+      return new AsJsonsWithFailures<>(
+          new InferableFunction<WithFailures.ExceptionElement<InputT>, 
FailureT>(
+              exceptionHandler) {},
+          failureType);
+    }
+
+    @Override
+    public WithFailures.Result<PCollection<String>, FailureT> 
expand(PCollection<InputT> input) {
+      return input.apply(
+          MapElements.into(TypeDescriptors.strings())
+              .via(
+                  Contextful.fn(
+                      (Contextful.Fn<InputT, String>) (input1, c) -> 
writeValue(input1),
+                      Requirements.empty()))
+              .exceptionsInto(failureType)
+              .exceptionsVia(exceptionHandler));
+    }
+  }
+
+  /**
+   * A default handler that extracts information from an exception to a {@code 
Map<String, String>}
 
 Review comment:
   The info from this class doc would be perfect for filling out Javadoc for 
`exceptionsVia` that instantiates this.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 316842)
    Time Spent: 3h  (was: 2h 50m)

> Add exception handling to Json transforms in Java SDK
> -----------------------------------------------------
>
>                 Key: BEAM-8100
>                 URL: https://issues.apache.org/jira/browse/BEAM-8100
>             Project: Beam
>          Issue Type: Improvement
>          Components: extensions-java-json, sdk-java-core
>            Reporter: Jeff Klukas
>            Assignee: Alexey Romanenko
>            Priority: Minor
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Follow-up to https://issues.apache.org/jira/browse/BEAM-5638. We have 
> exception handling for MapElements and FlatMapElements. It should be 
> straightforward to add parallel signatures to AsJsons and ParseJsons for 
> catching parsing errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to