Locustv2 opened a new issue, #39914:
URL: https://github.com/apache/airflow/issues/39914

   ### Description
   
   Hey guys, 
   
   So i have been dealing with this issue for a while now without any light...
   I have a DAG that queries data from BigQuery, and depending on the results 
some Dynamic Task Mappings are created to insert an entry in another BigQuery 
table using `BigQueryInsertJobOperator`...
   
   For Example:
   
   ```
   from airflow import DAG
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryGetDataOperator, BigQueryInsertJobOperator
   from airflow.utils.dates import days_ago
   from airflow.decorators import task
   from airflow import XComArg
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'retries': 1,
   }
   
   dag = DAG(
       dag_id='bigquery_data_transfer_mapped_correct',
       default_args=default_args,
       schedule_interval="@daily",
       catchup=False,
       tags=['example'],
   )
   
     @task
     def get_data(sql):
         bq_hook = BigQueryHook(...)
     
         self.log.info('Fetching Data from:')
         self.log.info('Query: %s', sql)
     
         bq_client = bq_hook.get_client()
         query_job = bq_client.query(sql)
         client_results = query_job.result()  # Waits for the query to finish
     
         results = list(dict(result) for  result in client_results)
     
         self.log.info(f"Retrieved {len(results)} rows from BigQuery")
         self.log.info('Response: %s', results)
     
         return results
   
     query_data = get_data("SELECT * FROM some_table WHERE some_conditions;")
     
     insert = BigQueryInsertJobOperator.partial(
         task_id=f"insert_data",
         configuration={
             'query': {
                 'query': "INSERT INTO `project.dataset.table` (field1, field2) 
VALUES ('{{ params.field1 }}', '{{ params.field2 }}')",
                 'useLegacySql': False,
             }
         }
     ).expand(
         params=XComArg(query_data)
     )
     
     query_data >> insert
   
   ```
   
   Please note that this code is just a basic example that i just wrote and in 
my usecase, i actually have a task_group that expands and takes in a parameter 
to be sent to the `params` in one of the `BigQueryInsertJobOperator` task.
   
   
   After running my DAG i get an error saying:
   
   ```
   Broken DAG: [/opt/airflow/dags/src/dag.py] Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 407, in apply_defaults
       default_args, merged_params = get_merged_defaults(
                                     ^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 167, in get_merged_defaults
       raise TypeError("params must be a mapping")
   TypeError: params must be a mapping
   ```
   
   The airflow version is: 
   
   ```
   Version: [v2.8.1](https://pypi.python.org/pypi/apache-airflow/2.8.1)
   Git Version: .release:c0ffa9c5d96625c68ded9562632674ed366b5eb3
   ```
   
   ### Use case/motivation
   
   I would like to be able to use dynamic task mapping to create multiple 
instances of `BigQueryInsertJobOperator` with dynamic `params` attribute.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to