I think you hit this issue: https://stackoverflow.com/questions/64526500/using-writetobigquery-file-loads-in-a-streaming-pipeline-just-creates-a-lot-of-t I created https://github.com/apache/beam/issues/31329 to track this bug.
Some workarounds: 1. https://cloud.google.com/dataflow/docs/guides/write-to-bigquery has more docs about different write methods. I would recommend STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE 2. If you have another project, you could use `load_job_project_id` to store these temp tables and remove them regularly 3. Like the comments in the stackoverflow question, just do GCS and then load them to BigQuery. On Thu, May 16, 2024 at 10:10 AM Pawel Jarkowski via dev < dev@beam.apache.org> wrote: > Hi, > I'm working on a pipeline that gets data from pub/sub and splits the data > into a few big query tables. I have a problem with temporary tables because > the pipeline creates the temp tables (I have to compute table id and schema > based on data) but it does not delete all of them. Is there any possibility > of setting a different dataset for temporary tables or batching the data > into a list of dicts and pushing more at once? I tried to create > windowing + groupbykey where the key is a table name and this code created > a list of dictionaries but I got an error message: > """ > File "fastavro/_write.pyx", line 732, in fastavro._write.Writer.write > File "fastavro/_write.pyx", line 469, in fastavro._write.write_data > File "fastavro/_write.pyx", line 459, in fastavro._write.write_data > File "fastavro/_write.pyx", line 357, in fastavro._write.write_record > TypeError: Error writing row to Avro: unhashable type: 'dict' > """ > Without the grouping and windowing all works fine but the pipeline creates > a lot of temporary tables that make a mess in our datasets. > There are my bigquery options: > method="FILE_LOADS" > triggering_frequency=60 > max_files_per_bundle=1 > temp_file_format="AVRO" > table="" << this is computed > dataset="" << this is computed > project="my_project" > custom_gcs_temp_location="gs://my_gcp_tmp_location/temp/" > > There are my pipeline options: > runner="DataflowRunner" > region="my_region" > project="my_project" > service_account_email="my_df_SA" > temp_location="gs://my_gcp_location/temp/" > max_num_workers=10 > no_use_public_ips=true > use_public_ips=false > network="my_network_url" > subnetwork="my_subnetwork_url" > machine_type="my_machine_type" > label="pubsub-qa-label" > job_name="job-name-pubsub-qa" > save_main_session=false > streaming=true > setup_file="./setup.py" > > Thank you in advance for your reply and best regards, > Paweł Jarkowski >