rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437764105



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable 
extended error reporting.
+   * {@Code ParseResult results =
+   * 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo());
 }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = 
results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends 
PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() 
{};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), 
getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      
TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+      this.failedParse =
+          
result.get(PARSE_ERROR_LINE).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
+      this.failedParseWithErr =
+          result
+              .get(PARSE_ERROR_LINE_WITH_MSG)
+              .setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
+
+      return ParseResult.result(this);
+    }
+
+    private static class ParseWithError extends DoFn<String, Row> {
+      private transient volatile @Nullable ObjectMapper objectMapper;

Review comment:
       This was used in the original JsonToRow , but I didnt see any notes as 
to why. Also this Object Mapper was at the Transform class level before and 
used in an anon DoFn. 
   In theory this is something that I would instantiate in @Setup, but again we 
might want to make those changes in a separate PR which can also have some 
larger tests to make sure it works ok with multiple DoFn's are active? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to