sarinasij commented on issue #31354: URL: https://github.com/apache/beam/issues/31354#issuecomment-2174762096
@Amar3tto Below is our current usage description: To facilitate the processing of failed records, we require the original AvroGenericRecordMessage for further deadletter handling. It's important to note that the AvroGenericRecordMessage contains more information than the TableRow in BigQuery. Specifically, it includes the eventType, which is not part of the table columns but is crucial for dead letter metrics. To address this, we utilize the withFormatRecordOnFailureFunction() to construct a dummy TableRow that can be decoded back into the original AvroGenericRecordMessage. ``` return BigQueryIO.<AvroGenericRecordMessage>write() .to(input -> getTableDestination(input, outputTableProject, outputTableDataset, outputTableMap)) .withMethod(STORAGE_WRITE_API) .withTriggeringFrequency(Duration.standardSeconds(bqWindowInSec)) .withNumStorageWriteApiStreams(numStreams) .withFormatFunction(AvroGenericRecordToBigQuery::formatAvroToTableRow) .withFormatRecordOnFailureFunction(AvroGenericRecordToBigQuery::formatAvroToFailedTableRow) .withCreateDisposition(CREATE_NEVER) .withWriteDisposition(WRITE_APPEND) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()); private static TableRow formatAvroToFailedTableRow(final AvroGenericRecordMessage message) { // Build a dummy table row containing the original Avro payload so it can be later used to // dead letter the event. Unfortunately, we need to encode this into a TableRow since the BigQueryIO // module doesn't provide any other way to get back to the original AvroGenericRecordMessage object // that failed insertion. return FormatBigQueryDeadLetters.**encodeAvroRecordToTableRow**(message); } public static TableRow encodeAvroRecordToTableRow(AvroGenericRecordMessage record) { var row = new TableRow(); row.set(BQ_TABLEROW_COLUMN_EVENT_NAME, record.getEventType()); row.set(BQ_TABLEROW_COLUMN_PAYLOAD, Base64.getEncoder().encodeToString(record.getBinaryEncoding())); return row; } ``` With the case I would think "add withFormatRecordOnFailureFunction() from TableRow to TableRow" might not work since we need additional info for the failed rows (which is from the original AvroGenericRecordMessage). Please kindly advise how we can implement the same with STORAGE_WRITE_API method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org