[
https://issues.apache.org/jira/browse/BEAM-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Renato Martins Leite updated BEAM-10640:
----------------------------------------
Description:
In:
class BigQueryWriteFn(DoFn):
- def _flush_batch(self, destination):
Return an additional pvalue.TaggedOutput with the detailed ERROR from failed
insertion to BigQuery.
Today the error returns only the row (payload) of the error, like this:
{code:java}
// Return Statement
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
]
{code}
For error analysis it is super important to understand WHAT is causing the
error.
In this same function, we only need to return the error from BigQuery in an
additional pvalue.TaggedOutput:
{code:java}
// Function that captures the error
passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
{code}
The new return would look like this:
{code:java}
// new return statement
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row, error)))
for row in failed_rows
]{code}
Thank you!
was:
In:
class BigQueryWriteFn(DoFn):
- def _flush_batch(self, destination):
Return an additional pvalue.TaggedOutput with the detailed ERROR from failed
insertion to BigQuery.
Today the error returns only the row (payload) of the error, like this:
{code:java}
// return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
]
{code}
For error analysis it is super important to understand WHAT is causing the
error.
In this same function, we only need to return the error from BigQuery in an
additional pvalue.TaggedOutput:
{code:java}
// passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
{code}
The new return would look like this:
{code:java}
// return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row, error)))
for row in failed_rows
]{code}
Thank you!
> Return ERROR details from apache_beam.io.gcp.bigquery.WriteToBigQuery failed
> inserts
> ------------------------------------------------------------------------------------
>
> Key: BEAM-10640
> URL: https://issues.apache.org/jira/browse/BEAM-10640
> Project: Beam
> Issue Type: Improvement
> Components: beam-community, io-py-gcp
> Affects Versions: 2.23.0
> Environment: LocalRunner, Beam v2.23
> Reporter: Renato Martins Leite
> Assignee: Aizhamal Nurmamat kyzy
> Priority: P1
>
> In:
> class BigQueryWriteFn(DoFn):
> - def _flush_batch(self, destination):
>
> Return an additional pvalue.TaggedOutput with the detailed ERROR from failed
> insertion to BigQuery.
>
> Today the error returns only the row (payload) of the error, like this:
> {code:java}
> // Return Statement
> return [
> pvalue.TaggedOutput(
> BigQueryWriteFn.FAILED_ROWS,
> GlobalWindows.windowed_value((destination, row)))
> for row in failed_rows
> ]
> {code}
>
> For error analysis it is super important to understand WHAT is causing the
> error.
> In this same function, we only need to return the error from BigQuery in an
> additional pvalue.TaggedOutput:
>
> {code:java}
> // Function that captures the error
> passed, errors = self.bigquery_wrapper.insert_rows(
> project_id=table_reference.projectId,
> dataset_id=table_reference.datasetId,
> table_id=table_reference.tableId,
> rows=rows,
> insert_ids=insert_ids,
> skip_invalid_rows=True)
> {code}
> The new return would look like this:
>
> {code:java}
> // new return statement
> return [
> pvalue.TaggedOutput(
> BigQueryWriteFn.FAILED_ROWS,
> GlobalWindows.windowed_value((destination, row, error)))
> for row in failed_rows
> ]{code}
> Thank you!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)