damccorm opened a new issue, #21694:
URL: https://github.com/apache/beam/issues/21694
I’m currently using the legacy big query insert on a streaming pipeline (not
using the streaming engine) like this:
```
bqWriter =
bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
.withExtendedErrorInfo();
bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
writeResult.getFailedInsertsWithErr().apply("BQ-insert-error-write",
HandleInsertError.of();
```
and in HandleInsertError we process the BigQueryInsertError add some
metadata and write to a desired big query error table:
```
@Override
public PCollection<Void> expand(PCollection<BigQueryInsertError> input) {
return input
.apply("transform-err-table-row", ParDo.of(new
DoFn<BigQueryInsertError, KV<TableRow, TableDestination>>()
{
@ProcessElement
public void processElement(ProcessContext c) {
BigQueryInsertError bigQueryInsertError = c.element();
TableRow convertedRow
= new TableRow();
convertedRow.set("error",
bigQueryInsertError.getError().toString());
convertedRow.set("t", CommonConverter.convertDate(new
Date()));
convertedRow.set(UUID,
bigQueryInsertError.getRow().get(UUID));
TableDestination tableDestination =
BqUtil.getTableDestination(bigQueryInsertError.getTable().getProjectId(),
bigQueryInsertError.getTable().getDatasetId(),
errorTable);
c.output(KV.of(convertedRow,tableDestination));
}
}))
.apply(new
BqInsertError());
}
```
I’m trying to change the write method to use the new one
```
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
```
but I get this error:
```
When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES,
triggering frequency must
be specified
```
even though the documentation indicates that the triggering frequency is
relevant to FILE_LOAD method:
[https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-](https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-)
after I’ve added the triggering frequency and NumStorageWriteApiStreams im
getting this error:
```
Cannot use getFailedInsertsWithErr as this WriteResult does not use extended
errors. Use getFailedInserts
instead
```
but the difference between these functions is that getFailedInsertsWithErr
expands PCollection<BigQueryInsertError\>
and there we have 2 features that are not avaliable from the
getFailedInserts function because it expands PCollection<TableRow\>:
1. we can get the insert error bigQueryInsertError.getError()
2. we can determine the projectid and dataset id by using
bigQueryInsertError.getTable().getProjectId(),
bigQueryInsertError.getTable().getDatasetId()
we need them because our pipeline is a multi tenant use case and to
get those prarameters otherwise would require a lot
of overhead.
and also when I’m trying to run it with the getFailedInserts like that:
```
bqWriter =
bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(5))
.withNumStorageWriteApiStreams(12);
bqErrorHandler = (writeResult, eventsProcessingOptions1)
->
writeResult.getFailedInserts().apply("BQ-insert-error-write",
HandleStorageWriteApiInsertError.of();
```
I get the following error:
```
Record-insert retry policies are not supported when using BigQuery load jobs.
```
but I’m using the STORAGE_API_WRITES which normally should support
retryTransientErrors
So first i think there is a something missing in the implementation of that
write method that makes the retry feature not supported,
and as a feature request is to support getFailedInsertsWithErr in the
writeResult when using BigQueryIO.Write.Method.STORAGE_WRITE_API
if there is an existing workaround for that now it would be great because
switching the write method significantly cuts our costs
Thanks!
Imported from Jira
[BEAM-14135](https://issues.apache.org/jira/browse/BEAM-14135). Original Jira
may contain additional context.
Reported by: yoni.be.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]