jroachgolf84 opened a new issue, #62373:
URL: https://github.com/apache/airflow/issues/62373
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
_No response_
### Apache Airflow version
3.1.5
### Operating System
Linux
### Deployment
Astronomer
### Deployment details
_No response_
### What happened
When running the DAG below (shown in the "how to reproduce" text box), a
`DagBag timeout error` was thrown. This occurred when running on Astronomer A5
(1 vCPU and 2 GiB) workers. When bumping up the Worker Size, this issue was
resolved. However, imports for these two Operators were still taking well over
30 seconds.
### What you think should happen instead
Imports do not cause a `DagBag timeout error`, even on small Workers.
### How to reproduce
Running this DAG and checking the logging statements in the Task Logs.
```python
import logging
import time
start_time = time.perf_counter()
import pendulum
from airflow.decorators import dag
from airflow.decorators import task
logger = logging.getLogger(__name__)
logger.info(f"DAG_PERF_CHECK 1: {time.perf_counter()-start_time}")
# executed on Astro Worker Type = A5. 0-10 Workers, Concurrency: 5
# -------------------------------------------
# when following 2 imports are commented out:
# INFO - DAG_PERF_CHECK 1: 0.00933
# INFO - DAG_PERF_CHECK 2: 0.00963
# INFO - DAG_PERF_CHECK 3: 0.01018
# -------------------------------------------
# same dag with following 2 imports included:
# some tasks fail with AirflowTaskTimeout: DagBag import timeout for
[.../test_dag.py] after 30.0s
# and ones that succeed look have values like below:
# INFO - DAG_PERF_CHECK 1: 0.08950
# INFO - DAG_PERF_CHECK 2: 26.09278
# INFO - DAG_PERF_CHECK 3: 26.09359
# -------------------------------------------
# same as above, but Worker Type = A10. 0-10 Workers, Concurrency: 5
# all tasks succeed.
# INFO - DAG_PERF_CHECK 1: 0.00706
# INFO - DAG_PERF_CHECK 2: 15.74727
# INFO - DAG_PERF_CHECK 3: 15.74815
from airflow.providers.google.cloud.operators.dataproc import
DataprocCreateBatchOperator
from airflow.providers.google.cloud.operators.bigquery import
BigQueryInsertJobOperator
logger.info(f"DAG_PERF_CHECK 2: {time.perf_counter()-start_time}")
from airflow.operators.empty import EmptyOperator
logger.info(f"DAG_PERF_CHECK 3: {time.perf_counter()-start_time}")
@dag(
schedule="*/10 * * * *",
start_date=pendulum.datetime(2026, 2, 1),
catchup=False,
max_active_tasks=16,
)
def dag_performance_check_gcp():
@task
def dummy_task(num: int) -> int:
return num
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> dummy_task() >> end
dag_performance_check_gcp()
```
### Anything else
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]