jroachgolf84 opened a new issue, #62373:
URL: https://github.com/apache/airflow/issues/62373

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Apache Airflow version
   
   3.1.5
   
   ### Operating System
   
   Linux
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When running the DAG below (shown in the "how to reproduce" text box), a 
`DagBag timeout error` was thrown. This occurred when running on Astronomer A5 
(1 vCPU and 2 GiB) workers. When bumping up the Worker Size, this issue was 
resolved. However, imports for these two Operators were still taking well over 
30 seconds.
   
   ### What you think should happen instead
   
   Imports do not cause a `DagBag timeout error`, even on small Workers.
   
   ### How to reproduce
   
   Running this DAG and checking the logging statements in the Task Logs.
   
   ```python
   import logging
   import time
   
   start_time = time.perf_counter()
   
   import pendulum
   
   from airflow.decorators import dag
   from airflow.decorators import task
   
   
   logger = logging.getLogger(__name__)
   
   logger.info(f"DAG_PERF_CHECK 1: {time.perf_counter()-start_time}")
   
   # executed on Astro  Worker Type = A5.  0-10 Workers, Concurrency: 5
   # -------------------------------------------
   # when following 2 imports are commented out:
   # INFO - DAG_PERF_CHECK 1: 0.00933
   # INFO - DAG_PERF_CHECK 2: 0.00963
   # INFO - DAG_PERF_CHECK 3: 0.01018
   # -------------------------------------------
   # same dag with following 2 imports included:
   # some tasks fail with AirflowTaskTimeout: DagBag import timeout for 
[.../test_dag.py] after 30.0s
   # and ones that succeed look have values like below:
   # INFO - DAG_PERF_CHECK 1:  0.08950
   # INFO - DAG_PERF_CHECK 2: 26.09278
   # INFO - DAG_PERF_CHECK 3: 26.09359
   # -------------------------------------------
   # same as above, but  Worker Type = A10.  0-10 Workers, Concurrency: 5
   # all tasks succeed.
   # INFO - DAG_PERF_CHECK 1: 0.00706
   # INFO - DAG_PERF_CHECK 2: 15.74727
   # INFO - DAG_PERF_CHECK 3: 15.74815
   
   from airflow.providers.google.cloud.operators.dataproc import 
DataprocCreateBatchOperator
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
   
   logger.info(f"DAG_PERF_CHECK 2: {time.perf_counter()-start_time}")
   
   from airflow.operators.empty import EmptyOperator
   
   logger.info(f"DAG_PERF_CHECK 3: {time.perf_counter()-start_time}")
   
   
   @dag(
       schedule="*/10 * * * *",
       start_date=pendulum.datetime(2026, 2, 1),
       catchup=False,
       max_active_tasks=16,
   )
   def dag_performance_check_gcp():
   
       @task
       def dummy_task(num: int) -> int:
           return num
   
   
       start = EmptyOperator(task_id="start")
       end = EmptyOperator(task_id="end")
   
       start >> dummy_task() >> end
   
   
   
   dag_performance_check_gcp()
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to