quantumlicht opened a new issue, #31481:
URL: https://github.com/apache/airflow/issues/31481
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
I'm using version `composer-2.1.15-airflow-2.5.1` using `composer-dev` to
run it.
Using a MappedArgument with `GCSToBigQueryOperator` returns an error: `
TypeError: Object of type MappedArgument is not JSON serializable
### What you think should happen instead
It should work as with a regular XCOM Argument.
### How to reproduce
Disclaimer, i'm relatively new to airflow 2 and taskflow. I'm trying to
migrate a codebase written with airflow 1, so there might be some glaring
problems with how I'm addressing the problem.
The Issue i'm having is with the `schema_fields` property passed to
`GCSToBigQueryOperator` which is a `MappedArgument` instead of being resolve to
a list of dicts like I expected.
As a first step, the DAG loads the metadata from GCS with a task named
`get_export_metadata` that returns a dict with the shape
`{"databases":Dict[str, List[str]], "schemas": Dict[str, List[Dict[str,
str]]]}` (multiple_outputs)
example:
```json
{
"databases": {
"test": [
"table_1",
"table_2"
]
},
"schemas": {
"table_1": [
{
"mode": "NULLABLE",
"name": "id",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "creation_date",
"type": "STRING"
},
]
}
}
```
Here's the task defined for my DAG
```python
@task
def get_table_names(database_map, db_name)
return database_map[db_name]
@task
def build_args(schema_map, table_name, instance_name, instance_pool,
db_name):
return {
"instance_name": instance_name,
"instance_pool": instance_pool,
"schema_fields": schema_map[table_name],
"db_name": db_name,
"table_name": table_name,
}
@task_group
def extract_data_csv_dump(table_name, instance_name, db_name,
instance_pool, schema_fields):
@task
def get_export_query(table_name: str) -> str:
return f"SELECT * FROM {table_name};"
@task
def get_bq_dump_table(db_name, table_name):
return f"{settings.GCP_PROJECT_ID}.{db_name}.{table_name}"
@task
def get_dump_object_path(db_name, table_name):
dump_object_name = f"{table_name}__airflow_dump*"
return (
f"{settings.REPORTING_PATH}/{GCS_SYNC_FOLDER}/{db_name}/{dump_object_name}"
)
export_query = get_export_query(table_name)
bq_dump_table_name = get_bq_dump_table(db_name, table_name)
dump_object_path = get_dump_object_path(db_name, table_name)
load_dump_task = GCSToBigQueryOperator(
bucket=settings.AIRFLOW_GCS_BUCKET,
source_objects=[dump_object_path],
schema_fields=schema_fields, <<< This is where it fails
destination_project_dataset_table=bq_dump_table_name,
write_disposition="WRITE_TRUNCATE",
source_format="CSV",
allow_quoted_newlines=True,
task_id="load_csv_dump",
pool=settings.BIG_QUERY_TASK_POOL,
)
db_data = { "conn_id": "cloudsql_mws",
"connection_pool": "my_connection",
"instance_pool": "my_pool",
}
instance_name="my_instance_name"
db_name="my_db_name"
export_metadata = get_export_metadata()
table_names = get_table_names(export_metadata["databases"], db_name)
instance_pool = db_data["instance_pool"]
kw_args_list = build_args.partial(
schema_map=export_metadata["schemas"],
instance_name=instance_name,
instance_pool=instance_pool,
db_name=db_name
).expand(table_name=table_names)
extract_data_csv_dump.expand_kwargs(kw_args_list)
### Operating System
Composer dev image (linux I assume)
### Versions of Apache Airflow Providers
apache-airflow-providers-apache-beam @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_apache_beam-4.3.0-py3-none-any.whl
apache-airflow-providers-cncf-kubernetes @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_cncf_kubernetes-6.0.0-py3-none-any.whl
apache-airflow-providers-common-sql @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_common_sql-1.4.0-py3-none-any.whl
apache-airflow-providers-dbt-cloud @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_dbt_cloud-3.1.0-py3-none-any.whl
apache-airflow-providers-ftp @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_ftp-3.3.1-py3-none-any.whl
apache-airflow-providers-google @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_google-2023.4.13%2Bcomposer-py3-none-any.whl
apache-airflow-providers-hashicorp @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_hashicorp-3.3.1-py3-none-any.whl
apache-airflow-providers-http @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_http-4.3.0-py3-none-any.whl
apache-airflow-providers-imap @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_imap-3.1.1-py3-none-any.whl
apache-airflow-providers-mysql @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_mysql-4.0.2-py3-none-any.whl
apache-airflow-providers-postgres @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_postgres-5.4.0-py3-none-any.whl
apache-airflow-providers-sendgrid @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_sendgrid-3.1.0-py3-none-any.whl
apache-airflow-providers-sqlite @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_sqlite-3.3.1-py3-none-any.whl
apache-airflow-providers-ssh @
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_ssh-3.6.0-py3-none-any.whl
### Deployment
Google Cloud Composer
### Deployment details
This is running locally and I use the `dag.test()` command to execute it
### Anything else
N/A
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]