dabhicusp commented on issue #21476:
URL: https://github.com/apache/beam/issues/21476#issuecomment-1625165954

   Hello @kennknowles @waltage while inserting data into BQ, data isn't 
inserted and also `ProjectId` is changed to.
   Table Property that I use while inserting data in BQ is below:: 
            `WTBQ(table) | WTBQ(method) | WTBQ(schema) | WTBQ(create) | 
WTBQ(write)`
           `string | STREAMING_INSERTS | None | CREATE_NEVER | WRITE_APPEND`
   
   and also same for the `TableSchema` instead of `string`.
   
   The exact error i got is this ::
   `google.api_core.exceptions.NotFound: 404 POST 
https://bigquery.googleapis.com/bigquery/v2/projects/ee-aniket/datasets/mydataset/tables/sahildabhi1/insertAll?prettyPrint=false:
 Table 933583868273:mydataset.sahildabhi1 not found. [while running 'Write to 
BigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']`
   
   The code i use for this is ::
   ```
   import apache_beam as beam
   from google.cloud import bigquery
   
   def create_schema():
       fields = [
       bigquery.SchemaField('column1', 'STRING', mode='NULLABLE'),
       bigquery.SchemaField('column2', 'STRING', mode='NULLABLE')
       ]
       return fields
   
   def get_table_name(element):
           print(element)
           table_name = element['column1']
           table_name = f'ee-aniket.mydataset.{table_name}'
           try:
               table_schema = create_schema()
               table = bigquery.Table(table_name, schema=table_schema)
               table = bigquery.Client().create_table(table)
           except Exception as e:
               raise f"Can't create the table.{e}"
           finally:
               table_name_element =  table_name.split('.')
               return 
f"{table_name_element[0]}:{table_name_element[1]}.{table_name_element[2]}"
   
   def run_pipeline(pcollection):
       pcollection | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
           table=get_table_name,
           method = 'STREAMING_INSERTS',
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
       )
   
   with beam.Pipeline() as p:
       pcollection = p | beam.Create([{'column1': 'sahildabhi1', 'column2': 
'value2'}])
       run_pipeline(pcollection)
   ```
   
   I kindly request your assistance in resolving this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to