rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r438508022



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable 
extended error reporting.
+   * {@Code ParseResult results =
+   * 
jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo());
 }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = 
results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends 
PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() 
{};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), 
getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      
TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());

Review comment:
       Changed to be passed in via resultBuilder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to