quantumlicht opened a new issue, #31481:
URL: https://github.com/apache/airflow/issues/31481

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   I'm using version `composer-2.1.15-airflow-2.5.1` using `composer-dev` to 
run it.
   
   Using a MappedArgument with  `GCSToBigQueryOperator` returns an error: ` 
TypeError: Object of type MappedArgument is not JSON serializable
   
   
   
   
   ### What you think should happen instead
   
   It should work as with a regular XCOM Argument.
   
   ### How to reproduce
   
   Disclaimer, i'm relatively new to airflow 2 and taskflow. I'm trying to 
migrate a codebase written with airflow 1, so there might be some glaring 
problems with how I'm addressing the problem.
   
   The Issue i'm having is with the `schema_fields` property passed to  
`GCSToBigQueryOperator` which is a `MappedArgument` instead of being resolve to 
a list of dicts like I expected.
   
   As a first step, the DAG loads the metadata from GCS with a task named
   `get_export_metadata` that returns a dict with the shape 
`{"databases":Dict[str, List[str]], "schemas": Dict[str, List[Dict[str, 
str]]]}` (multiple_outputs)
   
   example:
   ```json
   {
       "databases": {
           "test": [
               "table_1",
               "table_2"
           ]
       },
       "schemas": {
           "table_1": [
               {
                   "mode": "NULLABLE",
                   "name": "id",
                   "type": "STRING"
               },
               {
                   "mode": "NULLABLE",
                   "name": "creation_date",
                   "type": "STRING"
               },
           ]
       }
   }
   ```
   
   Here's the task defined for my DAG
   ```python 
   @task
    def get_table_names(database_map, db_name)
      return database_map[db_name]
   @task
   def build_args(schema_map, table_name, instance_name, instance_pool, 
db_name):
      return {
               "instance_name": instance_name,
               "instance_pool": instance_pool,
               "schema_fields": schema_map[table_name],
               "db_name": db_name,
               "table_name": table_name,
      }
   
    @task_group
    def extract_data_csv_dump(table_name, instance_name, db_name, 
instance_pool, schema_fields):
   
        @task
        def get_export_query(table_name: str) -> str:
            return f"SELECT * FROM {table_name};"
   
        @task
        def get_bq_dump_table(db_name, table_name):
            return f"{settings.GCP_PROJECT_ID}.{db_name}.{table_name}"
   
        @task
        def get_dump_object_path(db_name, table_name):
            dump_object_name = f"{table_name}__airflow_dump*"
            return (
                
f"{settings.REPORTING_PATH}/{GCS_SYNC_FOLDER}/{db_name}/{dump_object_name}"
            )
   
        export_query = get_export_query(table_name)
   
        bq_dump_table_name = get_bq_dump_table(db_name, table_name)
        dump_object_path = get_dump_object_path(db_name, table_name)
        load_dump_task = GCSToBigQueryOperator(
            bucket=settings.AIRFLOW_GCS_BUCKET,
            source_objects=[dump_object_path],
            schema_fields=schema_fields, <<< This is where it fails
            destination_project_dataset_table=bq_dump_table_name,
            write_disposition="WRITE_TRUNCATE",
            source_format="CSV",
            allow_quoted_newlines=True,
            task_id="load_csv_dump",
            pool=settings.BIG_QUERY_TASK_POOL,
        )
   
   db_data = { "conn_id": "cloudsql_mws",
               "connection_pool": "my_connection",
               "instance_pool": "my_pool",
   }
   instance_name="my_instance_name"
   db_name="my_db_name"
   
   export_metadata = get_export_metadata() 
   table_names = get_table_names(export_metadata["databases"], db_name)
   instance_pool = db_data["instance_pool"]
   kw_args_list = build_args.partial(
       schema_map=export_metadata["schemas"],
       instance_name=instance_name,
       instance_pool=instance_pool,
       db_name=db_name
   ).expand(table_name=table_names)
   
   extract_data_csv_dump.expand_kwargs(kw_args_list)
   
   ### Operating System
   
   Composer dev image (linux I assume)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-beam @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_apache_beam-4.3.0-py3-none-any.whl
   apache-airflow-providers-cncf-kubernetes @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_cncf_kubernetes-6.0.0-py3-none-any.whl
   apache-airflow-providers-common-sql @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_common_sql-1.4.0-py3-none-any.whl
   apache-airflow-providers-dbt-cloud @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_dbt_cloud-3.1.0-py3-none-any.whl
   apache-airflow-providers-ftp @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_ftp-3.3.1-py3-none-any.whl
   apache-airflow-providers-google @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_google-2023.4.13%2Bcomposer-py3-none-any.whl
   apache-airflow-providers-hashicorp @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_hashicorp-3.3.1-py3-none-any.whl
   apache-airflow-providers-http @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_http-4.3.0-py3-none-any.whl
   apache-airflow-providers-imap @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_imap-3.1.1-py3-none-any.whl
   apache-airflow-providers-mysql @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_mysql-4.0.2-py3-none-any.whl
   apache-airflow-providers-postgres @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_postgres-5.4.0-py3-none-any.whl
   apache-airflow-providers-sendgrid @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_sendgrid-3.1.0-py3-none-any.whl
   apache-airflow-providers-sqlite @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_sqlite-3.3.1-py3-none-any.whl
   apache-airflow-providers-ssh @ 
file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_ssh-3.6.0-py3-none-any.whl
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   This is running locally and I use the `dag.test()` command to execute it
   
   ### Anything else
   
   N/A
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to