[
https://issues.apache.org/jira/browse/BEAM-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17526141#comment-17526141
]
Stefano Romano commented on BEAM-13487:
---------------------------------------
{*}Update{*}: I now understand that this issue only happens in batch load mode.
This is the reason why the same code was working for me in other pipelines,
which were operating in streaming mode.
What happens is that in batch mode beam first writes the data destined to each
of the destination table to a temporary table first (whose name is set to the
job id). However, this fails unless a schema for the destination table is
explicitly provided, as bigquery doesn't know the schema of this temporary
table.
Using `schema='SCHEMA_AUTODETECT'` *might* work, but only if the autodetected
schema actually happens to coincide with the schema of the eventual destination
table (it didn't in my case).
Setting `method='STREAMING_INSERTS'` solved the issue for me. I also believe
that providing dynamic schemas should solve the issue, though I haven't tried.
This feels to me like like an intrinsic limitation of the batch load mechanism,
which is currently not documented afaict.
> WriteToBigQuery Dynamic table destinations returns wrong tableId
> ----------------------------------------------------------------
>
> Key: BEAM-13487
> URL: https://issues.apache.org/jira/browse/BEAM-13487
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Affects Versions: 2.34.0
> Reporter: Augusto
> Priority: P1
>
> I am trying to write to bigquery to different table destinations and I would
> like to create the tables dynamically if they don't exist already.
> {code:java}
> bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e:
> compute_table_name(e),
> schema=compute_table_schema,
> additional_bq_parameters=additional_bq_parameters,
>
> write_disposition=BigQueryDisposition.WRITE_APPEND,
> create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
> ){code}
> The function compute_table_name is quite simple actually, I am just trying to
> get it to work.
> {code:java}
> def compute_table_name(element):
> if element['table'] == 'table_id':
> del element['table']
> return "project_id:dataset.table_id" {code}
> The schema is detected correctly and the table IS created and populated with
> records. The problem is, the table ID I get is something along the lines of:
> {code:java}
> datasetId: 'dataset'
> projectId: 'project_id'
> tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP... {code}
> I have also tried returning a bigquery.TableReference object in my
> compute_table_name function to no avail.
> {code:java}
> def compute_table_name(element):
> if element['table'] == 'Radio':
> del element['table']
> return TableReference(
> datasetId = "dataset_id",
> projectId = "project_id",
> tableId = "table_id"
> ) {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)