Locustv2 opened a new issue, #39914:
URL: https://github.com/apache/airflow/issues/39914
### Description
Hey guys,
So i have been dealing with this issue for a while now without any light...
I have a DAG that queries data from BigQuery, and depending on the results
some Dynamic Task Mappings are created to insert an entry in another BigQuery
table using `BigQueryInsertJobOperator`...
For Example:
```
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import
BigQueryGetDataOperator, BigQueryInsertJobOperator
from airflow.utils.dates import days_ago
from airflow.decorators import task
from airflow import XComArg
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}
dag = DAG(
dag_id='bigquery_data_transfer_mapped_correct',
default_args=default_args,
schedule_interval="@daily",
catchup=False,
tags=['example'],
)
@task
def get_data(sql):
bq_hook = BigQueryHook(...)
self.log.info('Fetching Data from:')
self.log.info('Query: %s', sql)
bq_client = bq_hook.get_client()
query_job = bq_client.query(sql)
client_results = query_job.result() # Waits for the query to finish
results = list(dict(result) for result in client_results)
self.log.info(f"Retrieved {len(results)} rows from BigQuery")
self.log.info('Response: %s', results)
return results
query_data = get_data("SELECT * FROM some_table WHERE some_conditions;")
insert = BigQueryInsertJobOperator.partial(
task_id=f"insert_data",
configuration={
'query': {
'query': "INSERT INTO `project.dataset.table` (field1, field2)
VALUES ('{{ params.field1 }}', '{{ params.field2 }}')",
'useLegacySql': False,
}
}
).expand(
params=XComArg(query_data)
)
query_data >> insert
```
Please note that this code is just a basic example that i just wrote and in
my usecase, i actually have a task_group that expands and takes in a parameter
to be sent to the `params` in one of the `BigQueryInsertJobOperator` task.
After running my DAG i get an error saying:
```
Broken DAG: [/opt/airflow/dags/src/dag.py] Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 407, in apply_defaults
default_args, merged_params = get_merged_defaults(
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 167, in get_merged_defaults
raise TypeError("params must be a mapping")
TypeError: params must be a mapping
```
The airflow version is:
```
Version: [v2.8.1](https://pypi.python.org/pypi/apache-airflow/2.8.1)
Git Version: .release:c0ffa9c5d96625c68ded9562632674ed366b5eb3
```
### Use case/motivation
I would like to be able to use dynamic task mapping to create multiple
instances of `BigQueryInsertJobOperator` with dynamic `params` attribute.
### Related issues
_No response_
### Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]