[ 
https://issues.apache.org/jira/browse/BEAM-14364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529761#comment-17529761
 ] 

Darren Norton commented on BEAM-14364:
--------------------------------------

I wanted to note that this proposed change I've suggested is very attractive 
for our Dataflow job and just wanted to provide some insight as to why we would 
be interested in being able to have a graceful response when using 
createDisposition(CREATE_NEVER) vs createDisposition(CREATE_IF_NOT_EXISTS):

I've been prototyping some optimizations to our existing Dataflow job, 
specifically around reducing the amount of bytes shuffled in Dataflow. A major 
contributor to our number of bytes shuffled came from a series of PCollections 
that carried around a TableSchema to supply to DynamicDestinations. Those 
PCollections looked like this: 
{code:java}
PCollection<KV<TableId, POJOContaining(TableSchema, TableRow)> {code}
We measured the relative size of the TableSchema to the TableRow and it turned 
out the TableSchema contributes 5 times as many bytes compared to the TableRow. 
We want to remove the TableSchema from the PCollection, and found that we could 
do this by avoiding passing in the schema altogether, since we are using the 
STREAMING_INSERTS writeMethod. This meant that our PCollections looked like 
this:


{code:java}
PCollection<KV<TableId, TableRow>  {code}
This is reduced our pipeline's cost significantly, but we had to work around 
the fact that in some cases the table may not be created. We have built an 
external retry system for messages that have had schema issues in the past, 
where otherwise healthy data goes to be reprocessed once our external schema 
service was updated with the proper schemas. Sometimes this happens when a user 
forgets to register a schema for their data, and thus we couldn't create the BQ 
table ahead of time. If we get the TableRow back in a result from BigQuery, we 
would be able to retry that later when the table has been created, however, 
given the current behavior, our pipeline crashes instead.

> 404s in BigQueryIO don't get output to Failed Inserts PCollection
> -----------------------------------------------------------------
>
>                 Key: BEAM-14364
>                 URL: https://issues.apache.org/jira/browse/BEAM-14364
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>            Reporter: Svetak Vihaan Sundhar
>            Assignee: Svetak Vihaan Sundhar
>            Priority: P1
>         Attachments: ErrorsInPrototypeJob.PNG
>
>
> Given that BigQueryIO is configured to use createDisposition(CREATE_NEVER),
> and the DynamicDestinations class returns "null" for a schema,
> and the table for that destination does not exist in BigQuery, When I stream 
> records to BigQuery for that table, then the write should fail,
> and the failed rows should appear on the output PCollection for Failed 
> Inserts (via getFailedInserts().
>  
> Almost all of the time, the table exists before hand, but given that new 
> tables can be created, we want this behavior to be non-explosive to the Job, 
> however, what we are seeing is that processing completely stops in those 
> pipelines, and eventually the jobs run out of memory. I feel that the 
> appropriate action when BigQuery 404's for the table, would be to submit 
> those failed TableRows to the output PCollection and continue processing as 
> normal.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to