sarinasij commented on issue #31354:
URL: https://github.com/apache/beam/issues/31354#issuecomment-2174762096

   @Amar3tto 
   
   Below is our current usage description:
   
   To facilitate the processing of failed records, we require the original 
AvroGenericRecordMessage for further deadletter handling. It's important to 
note that the AvroGenericRecordMessage contains more information than the 
TableRow in BigQuery. Specifically, it includes the eventType, which is not 
part of the table columns but is crucial for dead letter metrics.
   
   To address this, we utilize the withFormatRecordOnFailureFunction() to 
construct a dummy TableRow that can be decoded back into the original 
AvroGenericRecordMessage.
   
    ```
   return BigQueryIO.<AvroGenericRecordMessage>write()
               .to(input -> getTableDestination(input, outputTableProject, 
outputTableDataset, outputTableMap))
               .withMethod(STORAGE_WRITE_API)
               .withTriggeringFrequency(Duration.standardSeconds(bqWindowInSec))
               .withNumStorageWriteApiStreams(numStreams)
               
.withFormatFunction(AvroGenericRecordToBigQuery::formatAvroToTableRow)
               
.withFormatRecordOnFailureFunction(AvroGenericRecordToBigQuery::formatAvroToFailedTableRow)
               .withCreateDisposition(CREATE_NEVER)
               .withWriteDisposition(WRITE_APPEND)
               
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
   
       private static TableRow formatAvroToFailedTableRow(final 
AvroGenericRecordMessage message) {
           // Build a dummy table row containing the original Avro payload so 
it can be later used to
           // dead letter the event.  Unfortunately, we need to encode this 
into a TableRow since the BigQueryIO
           // module doesn't provide any other way to get back to the original 
AvroGenericRecordMessage object
           // that failed insertion.
           return 
FormatBigQueryDeadLetters.**encodeAvroRecordToTableRow**(message);
       }
   
       public static TableRow 
encodeAvroRecordToTableRow(AvroGenericRecordMessage record) {
           var row = new TableRow();
           row.set(BQ_TABLEROW_COLUMN_EVENT_NAME, record.getEventType());
           row.set(BQ_TABLEROW_COLUMN_PAYLOAD, 
Base64.getEncoder().encodeToString(record.getBinaryEncoding()));
   
           return row;
       }
   ```
   
   With the case I would think "add withFormatRecordOnFailureFunction() from 
TableRow to TableRow" might not work since we need additional info for the 
failed rows (which is from the original AvroGenericRecordMessage).
   Please kindly advise how we can implement the same with STORAGE_WRITE_API 
method.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to