[
https://issues.apache.org/jira/browse/BEAM-10201?focusedWorklogId=443517&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-443517
]
ASF GitHub Bot logged work on BEAM-10201:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Jun/20 01:18
Start Date: 10/Jun/20 01:18
Worklog Time Spent: 10m
Work Description: rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437806981
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
return this.objectMapper;
}
}
+
+ static class JsonToRowWithFailureCaptureFn
+ extends PTransform<PCollection<String>, PCollectionTuple> {
+ private transient volatile @Nullable ObjectMapper objectMapper;
+ private Schema schema;
+ private static final String METRIC_NAMESPACE = "JsonToRowFn";
+ private static final String DEAD_LETTER_METRIC_NAME =
"JsonToRowFn_ParseFailure";
+
+ private Distribution jsonConversionErrors =
+ Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+ public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+ public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+ PCollection<Row> deadLetterCollection;
+
+ static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+ // Throw exception if this schema is not supported by RowJson
+ RowJson.verifySchemaSupported(rowSchema);
+ return new JsonToRowWithFailureCaptureFn(rowSchema);
+ }
+
+ private JsonToRowWithFailureCaptureFn(Schema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+ return jsonStrings.apply(
+ ParDo.of(
+ new DoFn<String, Row>() {
+ @ProcessElement
+ public void processElement(ProcessContext context) {
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 443517)
Time Spent: 2h (was: 1h 50m)
> Enhance JsonToRow to add Deadletter Support
> -------------------------------------------
>
> Key: BEAM-10201
> URL: https://issues.apache.org/jira/browse/BEAM-10201
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Reporter: Reza ardeshir rokni
> Priority: P2
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Current JsonToRow transform does not support Dead Letter pattern on parse
> failures.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)