[ 
https://issues.apache.org/jira/browse/BEAM-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Augusto updated BEAM-13487:
---------------------------
    Description: 
I am trying to write to bigquery to different table destinations and I would 
like to create the tables dynamically if they don't exist already.
{code:java}
bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e: 
compute_table_name(e),
schema=compute_table_schema,                                             
additional_bq_parameters=additional_bq_parameters,                              
               
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
){code}
The function compute_table_name is quite simple actually, I am just trying to 
get it to work.
{code:java}
def compute_table_name(element):
    if element['table'] == 'table_id':
        del element['table']
        return "project_id:dataset.table_id" {code}
The schema is detected correctly and the table IS created and populated with 
records. The problem is, the table ID I get is something along the lines of:
{code:java}
datasetId: 'dataset'
projectId: 'project_id'
tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP... {code}
I have also tried returning a bigquery.TableReference object in my 
compute_table_name function to no avail.
{code:java}
def compute_table_name(element):
    if element['table'] == 'Radio':
        del element['table']
        return TableReference(
            datasetId = "dataset_id",
            projectId = "project_id",
            tableId = "table_id"
        ) {code}

  was:
I am trying to write to bigquery to different table destinations and I would 
like to create the tables dynamically if they don't exist already.
{code:java}
bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e: 
compute_table_name(e),
schema=compute_table_schema,                                             
additional_bq_parameters=additional_bq_parameters,                              
               write_disposition=BigQueryDisposition.WRITE_APPEND,              
                               
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
){code}
The function compute_table_name is quite simple actually, I am just trying to 
get it to work.
{code:java}
def compute_table_name(element):
    if element['table'] == 'table_id':
        del element['table']
        return "project_id:dataset.table_id" {code}
The schema is detected correctly and the table IS created and populated with 
records. The problem is, the table ID I get is something along the lines of:
{code:java}
datasetId: 'dataset'
projectId: 'project_id'
tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP... {code}
I have also tried returning a bigquery.TableReference object in my 
compute_table_name function to no avail.
{code:java}
def compute_table_name(element):
    if element['table'] == 'Radio':
        del element['table']
        return TableReference(
            datasetId = "dataset_id",
            projectId = "project_id",
            tableId = "table_id"
        ) {code}


> WriteToBigQuery Dynamic table destinations returns wrong tableId
> ----------------------------------------------------------------
>
>                 Key: BEAM-13487
>                 URL: https://issues.apache.org/jira/browse/BEAM-13487
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>    Affects Versions: 2.34.0
>            Reporter: Augusto
>            Priority: P1
>
> I am trying to write to bigquery to different table destinations and I would 
> like to create the tables dynamically if they don't exist already.
> {code:java}
> bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e: 
> compute_table_name(e),
> schema=compute_table_schema,                                             
> additional_bq_parameters=additional_bq_parameters,                            
>                  
> write_disposition=BigQueryDisposition.WRITE_APPEND,
> create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
> ){code}
> The function compute_table_name is quite simple actually, I am just trying to 
> get it to work.
> {code:java}
> def compute_table_name(element):
>     if element['table'] == 'table_id':
>         del element['table']
>         return "project_id:dataset.table_id" {code}
> The schema is detected correctly and the table IS created and populated with 
> records. The problem is, the table ID I get is something along the lines of:
> {code:java}
> datasetId: 'dataset'
> projectId: 'project_id'
> tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP... {code}
> I have also tried returning a bigquery.TableReference object in my 
> compute_table_name function to no avail.
> {code:java}
> def compute_table_name(element):
>     if element['table'] == 'Radio':
>         del element['table']
>         return TableReference(
>             datasetId = "dataset_id",
>             projectId = "project_id",
>             tableId = "table_id"
>         ) {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to