sarinasij commented on issue #31354:
URL: https://github.com/apache/beam/issues/31354#issuecomment-2174762096
@Amar3tto
Below is our current usage description:
To facilitate the processing of failed records, we require the original
AvroGenericRecordMessage for further deadletter handling. It's important to
note that the AvroGenericRecordMessage contains more information than the
TableRow in BigQuery. Specifically, it includes the eventType, which is not
part of the table columns but is crucial for dead letter metrics.
To address this, we utilize the withFormatRecordOnFailureFunction() to
construct a dummy TableRow that can be decoded back into the original
AvroGenericRecordMessage.
```
return BigQueryIO.<AvroGenericRecordMessage>write()
.to(input -> getTableDestination(input, outputTableProject,
outputTableDataset, outputTableMap))
.withMethod(STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(bqWindowInSec))
.withNumStorageWriteApiStreams(numStreams)
.withFormatFunction(AvroGenericRecordToBigQuery::formatAvroToTableRow)
.withFormatRecordOnFailureFunction(AvroGenericRecordToBigQuery::formatAvroToFailedTableRow)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
private static TableRow formatAvroToFailedTableRow(final
AvroGenericRecordMessage message) {
// Build a dummy table row containing the original Avro payload so
it can be later used to
// dead letter the event. Unfortunately, we need to encode this
into a TableRow since the BigQueryIO
// module doesn't provide any other way to get back to the original
AvroGenericRecordMessage object
// that failed insertion.
return
FormatBigQueryDeadLetters.**encodeAvroRecordToTableRow**(message);
}
public static TableRow
encodeAvroRecordToTableRow(AvroGenericRecordMessage record) {
var row = new TableRow();
row.set(BQ_TABLEROW_COLUMN_EVENT_NAME, record.getEventType());
row.set(BQ_TABLEROW_COLUMN_PAYLOAD,
Base64.getEncoder().encodeToString(record.getBinaryEncoding()));
return row;
}
```
With the case I would think "add withFormatRecordOnFailureFunction() from
TableRow to TableRow" might not work since we need additional info for the
failed rows (which is from the original AvroGenericRecordMessage).
Please kindly advise how we can implement the same with STORAGE_WRITE_API
method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]