mariayanovskaya opened a new issue, #40235:
URL: https://github.com/apache/airflow/issues/40235
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.6.3
### What happened?
I am working in Cloud Composer on GCP. We are using
composer-2.4.6-airflow-2.6.3.
I am building a script that is expected to use the
BigQueryUpdateTableSchemaOperator to change the schema of a BigQuery table. The
company I work for has very strict rule that all resources and jobs used in GCP
are located in Europe, in respect of GDPR regulations. The
BigQueryUpdateTableSchemaOperator in documentation is meant to accept
"location" as one of the parameters. When I try to pass the code below Airflow
raises DAG Import Error:
> Broken DAG:
[/home/airflow/gcs/dags/dags/ingestion_pipelines/ingestion_pipelines.py]
Traceback (most recent call last):
> File
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py",
line 429, in apply_defaults
> result = func(self, **kwargs, default_args=default_args)
> File
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py",
line 788, in __init__
> raise AirflowException(
> airflow.exceptions.AirflowException: Invalid arguments were passed to
BigQueryUpdateTableSchemaOperator (task_id: update_schema). Invalid arguments
were:
> **kwargs: {'location': 'EU'}
Code:
```
table_schema = {
"name": "column",
"type": "TIMESTAMP",
"description": "column",
}
update_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_schema",
dataset_id="dataset",
table_id="table_name",
project_id="PROJECT_ID",
schema_fields_updates=table_schema,
include_policy_tags=True,
location="EU",
impersonation_chain="SERVICE_ACCOUNT",
retries=0,
)
```
### What you think should happen instead?
The DAG should load without errors.
### How to reproduce
upload this code into the bucket that is connected in your Cloud Composer.
```from datetime import datetime
from airflow.decorators import dag
from airflow.decorators import task
from airflow.decorators import task_group
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.bigquery import
BigQueryUpdateTableSchemaOperator
@dag(
dag_id="update_table",
max_active_runs=1,
start_date=datetime(2023, 1, 1),
catchup=False,
)
def task_flow():
@task_group(group_id="database")
def task_groups():
#this is the bigquery table
dataset="covid19_ecdc_eu"
table_name="covid_19_geographic_distribution_worldwide"
project_id="bigquery-public-data"
table_schema = {
"name": "date",
"description": "date column",
}
update_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_bronze_schema_for_incr",
dataset_id=dataset,
table_id=table_name,
project_id=project_id,
schema_fields_updates=table_schema,
include_policy_tags=True,
location="EU",
retries=0,
)
chain(
update_schema
)
task_groups()
task_flow()
```
### Operating System
macOS Sonoma 14.5 23F79
### Versions of Apache Airflow Providers
https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#:~:text=composer%2D2.4.6%2Dairflow%2D2.6.3
### Deployment
Google Cloud Composer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]