damccorm opened a new issue, #21694:
URL: https://github.com/apache/beam/issues/21694

   I’m currently using the legacy big query insert on a streaming pipeline (not 
using the streaming engine) like this:
   ```
   
   bqWriter = 
bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
   .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
    
         .withExtendedErrorInfo();
   bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
        
     writeResult.getFailedInsertsWithErr().apply("BQ-insert-error-write",
                   HandleInsertError.of();
   ```
   
   and in  HandleInsertError we process the BigQueryInsertError add some 
metadata and write to a desired big query error table:
   
    
   ```
   
   @Override
   public PCollection<Void> expand(PCollection<BigQueryInsertError> input) {
    return input
    
          .apply("transform-err-table-row", ParDo.of(new 
DoFn<BigQueryInsertError, KV<TableRow, TableDestination>>()
   {
                @ProcessElement
                public void processElement(ProcessContext c) {
        
              BigQueryInsertError bigQueryInsertError = c.element();
                    TableRow convertedRow
   = new TableRow();
                    convertedRow.set("error", 
bigQueryInsertError.getError().toString());
    
                  convertedRow.set("t",  CommonConverter.convertDate(new 
Date()));
                    convertedRow.set(UUID,
   bigQueryInsertError.getRow().get(UUID));
                    TableDestination tableDestination = 
BqUtil.getTableDestination(bigQueryInsertError.getTable().getProjectId(),
    
                          bigQueryInsertError.getTable().getDatasetId(), 
errorTable);
                  
    c.output(KV.of(convertedRow,tableDestination));
                }
            }))
            .apply(new
   BqInsertError());
   }
   
   
   
   
   ```
   
    
   
   I’m trying to change the write method to use the new one
   ```
   
   .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
   ```
   
    
   
   but I get this error:
   ```
   
   When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, 
triggering frequency must
   be specified
    
   
   
   
   
   ```
   
   even though the documentation indicates that the triggering frequency is 
relevant to FILE_LOAD method:
   
[https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-](https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-)
   
    
   
   after I’ve added the triggering frequency and NumStorageWriteApiStreams im 
getting this error:
   ```
   
   Cannot use getFailedInsertsWithErr as this WriteResult does not use extended 
errors. Use getFailedInserts
   instead
   ```
   
    
   
   but the difference between these functions is that getFailedInsertsWithErr 
expands PCollection<BigQueryInsertError\>
   and there we have 2 features that are not avaliable from the 
getFailedInserts function because it expands PCollection<TableRow\>:
   
   1. we can get the insert error  bigQueryInsertError.getError()
   2. we can determine the projectid and dataset id by using 
          bigQueryInsertError.getTable().getProjectId(),
          bigQueryInsertError.getTable().getDatasetId()
          we need them because our pipeline is a multi tenant use case and to 
get                             those prarameters otherwise would require a lot 
of overhead.
   
   and also when I’m trying to run it with the getFailedInserts like that:
   ```
   
   bqWriter = 
bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
          
   .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
           .withTriggeringFrequency(Duration.standardSeconds(5))
    
         .withNumStorageWriteApiStreams(12);
   bqErrorHandler = (writeResult, eventsProcessingOptions1)
   ->
           writeResult.getFailedInserts().apply("BQ-insert-error-write",
                   HandleStorageWriteApiInsertError.of();
    
   
   
   
   
   ```
   
   I get the following error:
    
   ```
   
   Record-insert retry policies are not supported when using BigQuery load jobs.
   ```
   
    
   
   but I’m using the STORAGE_API_WRITES which normally should support 
retryTransientErrors
   
   So first i think  there is a something missing in the implementation of that 
write method  that makes the retry feature not supported,
   
   and as a feature request is to support getFailedInsertsWithErr in the 
writeResult when using BigQueryIO.Write.Method.STORAGE_WRITE_API
   
   if there is an existing workaround for that now it would be great because 
switching the write method significantly cuts our costs
   
   Thanks!
   
   Imported from Jira 
[BEAM-14135](https://issues.apache.org/jira/browse/BEAM-14135). Original Jira 
may contain additional context.
   Reported by: yoni.be.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to