pabloem commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436214157



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), 
context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());

Review comment:
       I guess this doesn't make sense, but - would it help to include the Row 
Schema that we tried(and failed) to use for this JSON string? Some users may 
not needed, and others can add it themselves in the downstream ParDo - but it's 
possible it may help. Thoughts?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), 
context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());
+                      }
+                    }
+                  })
+              .withOutputTags(main, TupleTagList.of(deadLetter)));

Review comment:
       you can add the schema for the outputs here, so that users do not need 
to add them themselves?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {

Review comment:
       `PCollectionTuple` is not great (though it's what we have after all) - 
do you think it could make sense to return a custom, more elegant `Result` 
type? See this PR, where something like this was added for Map-type 
ptransforms: 
https://github.com/apache/beam/pull/7736/files#diff-895d512e486834dfd818eec82c75b2c3R122-R181

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new 
TupleTag<Row>() {};
+
   public static PTransform<PCollection<String>, PCollection<Row>> 
withSchema(Schema rowSchema) {
     return JsonToRowFn.forSchema(rowSchema);
   }
 
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The 
original json string err :
+   * The error message from the parsing function.
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>{@link JsonToRow#MAIN_TUPLE_TAG}
+   *
+   * <p>{@Code PCollection<Row> personRows =
+   * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)}

Review comment:
       Maybe add the schema for the PCollections in `expand` so that users 
won't have to add them manually?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to