devinmnorris opened a new issue, #44892: URL: https://github.com/apache/airflow/issues/44892
### Apache Airflow Provider(s) apache-beam ### Versions of Apache Airflow Providers `apache-airflow-providers-apache-beam==5.7.1` Others specified in the [Cloud Composer version list](https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#:~:text=composer%2D2.9.1%2Dairflow%2D2.9.1) ### Apache Airflow version 2.9.1 ### Operating System composer-2.9.1-airflow-2.9.1 ### Deployment Google Cloud Composer ### Deployment details Triggerer config: `1 triggerer with 1 vCPU, 1 GB memory, 1 GB storage` ### What happened When triggering a Dataflow job using the `BeamRunPythonPipelineOperator` with `deferrable=True`, the task fails because the triggerer does not have access to the DAG folder and thus cannot access the Python file containing the Beam pipeline. Triggerer logs: ``` 2024-12-12 11:02:18.997 MST airflow-triggerer trigger deferrable_dag/manual__2024-12-12T18:02:13.335160+00:00/deferrable_task/-1/1 (ID 16) starting 2024-12-12 11:04:18.382 MST airflow-triggerer 1 triggers currently running 2024-12-12 11:04:21.491 MST airflow-triggerer Beam version: 2.57.0 2024-12-12 11:04:21.492 MST airflow-triggerer Running command: /tmp/apache-beam-venv_ucd7ai0/bin/python /home/airflow/gcs/dags/include/beam/deferrable.py --runner=DataflowRunner 2024-12-12 11:04:21.495 MST airflow-triggerer Start waiting for Apache Beam process to complete. 2024-12-12 11:04:21.542 MST airflow-triggerer /tmp/apache-beam-venv_ucd7ai0/bin/python: can't open file '/home/airflow/gcs/dags/include/beam/deferrable.py': [Errno 2] No such file or directory 2024-12-12 11:04:21.547 MST airflow-triggerer Process exited with return code: 2 2024-12-12 11:04:22.384 MST airflow-triggerer trigger deferrable_dag/manual__2024-12-12T18:02:13.335160+00:00/deferrable_task/-1/1 (ID 16) complete ``` ### What you think should happen instead Apache Beam needs to run a command like `python local_pipeline_file.py` to start the job, so my understanding is that needs to happen on the worker before the task gets deferred to a Trigger ### How to reproduce 1. Create a Cloud Composer environment with at least 1 triggerer 2. ```python deferrable_test = BeamRunPythonPipelineOperator( py_file=f"{{{{ conf.get('core', 'dags_folder') }}}}/include/beam/deferrable.py", runner="DataflowRunner", ..., deferrable=True, ) ``` ### Anything else The issue only occurs when running the DAG on Cloud Composer. Using the same setup locally with Docker Compose, everything works as expected. My theory is this is because the DAGs folder is not synchronized with the Airflow triggerer in CC but it is mounted as a volume for the triggerer in `docker-compose.yaml` ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
