zstrathe opened a new issue, #38713: URL: https://github.com/apache/airflow/issues/38713
### Apache Airflow Provider(s) apache-beam ### Versions of Apache Airflow Providers apache-airflow-providers-apache-beam==5.6.2 ### Apache Airflow version 2.8.4 ### Operating System Debian GNU/Linux 12 (bookworm) (official Airflow docker image) ### Deployment Docker-Compose ### Deployment details Using docker compose provided for airflow (https://airflow.apache.org/docs/apache-airflow/2.8.4/docker-compose.yaml), using the base airflow image (apache/airflow:2.8.4) with apache-airflow-providers-apache-beam==5.6.2 installed via pip ### What happened When attempting to utilize the BeamRunPythonPipelineOperator with a local file and DirectRunner (not utilizing Google Cloud services at all), the process fails to run, with an exception: **AirflowNotFoundException(f"The conn_id {conn_id} isn't defined")** airflow log: ``` 796f08231d0a *** Found local files: *** * /opt/airflow/logs/dag_id=beam_local_test_dag/run_id=manual__2024-04-03T14:42:24.780689+00:00/task_id=start_python_pipeline_local_direct_runner/attempt=1.log [2024-04-03, 09:42:25 CDT] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: beam_local_test_dag.start_python_pipeline_local_direct_runner manual__2024-04-03T14:42:24.780689+00:00 [queued]> [2024-04-03, 09:42:25 CDT] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: beam_local_test_dag.start_python_pipeline_local_direct_runner manual__2024-04-03T14:42:24.780689+00:00 [queued]> [2024-04-03, 09:42:25 CDT] {taskinstance.py:2193} INFO - Starting attempt 1 of 1 [2024-04-03, 09:42:25 CDT] {taskinstance.py:2217} INFO - Executing <Task(BeamRunPythonPipelineOperator): start_python_pipeline_local_direct_runner> on 2024-04-03 14:42:24.780689+00:00 [2024-04-03, 09:42:25 CDT] {standard_task_runner.py:60} INFO - Started process 391 to run task [2024-04-03, 09:42:25 CDT] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'beam_local_test_dag', 'start_python_pipeline_local_direct_runner', 'manual__2024-04-03T14:42:24.780689+00:00', '--job-id', '5009', '--raw', '--subdir', 'DAGS_FOLDER/test_bugfix.py', '--cfg-path', '/tmp/tmpsctri0q0'] [2024-04-03, 09:42:25 CDT] {standard_task_runner.py:88} INFO - Job 5009: Subtask start_python_pipeline_local_direct_runner [2024-04-03, 09:42:25 CDT] {task_command.py:423} INFO - Running <TaskInstance: beam_local_test_dag.start_python_pipeline_local_direct_runner manual__2024-04-03T14:42:24.780689+00:00 [running]> on host 796f08231d0a [2024-04-03, 09:42:25 CDT] {taskinstance.py:2513} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='beam_local_test_dag' AIRFLOW_CTX_TASK_ID='start_python_pipeline_local_direct_runner' AIRFLOW_CTX_EXECUTION_DATE='2024-04-03T14:42:24.780689+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-03T14:42:24.780689+00:00' [2024-04-03, 09:42:25 CDT] {taskinstance.py:2731} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/operators/beam.py", line 357, in execute return self.execute_sync(context) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/operators/beam.py", line 361, in execute_sync gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/gcs.py", line 159, in __init__ super().__init__( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 251, in __init__ self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson File "/home/airflow/.local/lib/python3.8/site-packages/airflow/hooks/base.py", line 82, in get_connection conn = Connection.get_connection_from_secrets(conn_id) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py", line 514, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `google_cloud_default` isn't defined [2024-04-03, 09:42:25 CDT] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=beam_local_test_dag, task_id=start_python_pipeline_local_direct_runner, execution_date=20240403T144224, start_date=20240403T144225, end_date=20240403T144225 [2024-04-03, 09:42:25 CDT] {standard_task_runner.py:107} ERROR - Failed to execute job 5009 for task start_python_pipeline_local_direct_runner (The conn_id `google_cloud_default` isn't defined; 391) [2024-04-03, 09:42:26 CDT] {local_task_job_runner.py:234} INFO - Task exited with return code 1 [2024-04-03, 09:42:26 CDT] {taskinstance.py:3312} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` ### What you think should happen instead I believe the issue is that in **airflow.providers.apache.beam.operators.beam.py,** the **execute_sync()** method of **BeamRunPythonPipelineOperator** calls the google cloud hook (GCSHook) unnecessarily when local file paths are provided: ``` def execute_sync(self, context: Context): with ExitStack() as exit_stack: gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id) if self.py_file.lower().startswith("gs://"): tmp_gcs_file = exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file)) self.py_file = tmp_gcs_file.name if self.snake_case_pipeline_options.get("requirements_file", "").startswith("gs://"): tmp_req_file = exit_stack.enter_context( gcs_hook.provide_file(object_url=self.snake_case_pipeline_options["requirements_file"]) ) self.snake_case_pipeline_options["requirements_file"] = tmp_req_file.name ``` I believe that this method should be modified to only call GCSHook when necessary (when the specified py_file or requirements_file starts with "gs://"): ``` def execute_sync(self, context: Context): with ExitStack() as exit_stack: if self.py_file.lower().startswith("gs://"): gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id) tmp_gcs_file = exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file)) self.py_file = tmp_gcs_file.name if self.snake_case_pipeline_options.get("requirements_file", "").startswith("gs://"): if 'gcs_hook' not in locals(): gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id) tmp_req_file = exit_stack.enter_context( gcs_hook.provide_file(object_url=self.snake_case_pipeline_options["requirements_file"]) ) self.snake_case_pipeline_options["requirements_file"] = tmp_req_file.name ``` In my own testing this works. However, I do not use and have not tested any Google Cloud features. ### How to reproduce Using the following example DAG, with a local text file as an input for testing, located in '/opt/airflow/dags/' folder in airflow docker container ``` import logging from datetime import datetime from airflow import DAG from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator logger = logging.getLogger("airflow.task") with DAG( dag_id='beam_local_test_dag', start_date=datetime(2024, 4, 3), schedule=None ) as dag: test_example_beam = BeamRunPythonPipelineOperator( task_id="start_python_pipeline_local_direct_runner", py_file="apache_beam.examples.wordcount", py_options=['-m'], pipeline_options={"input": "/opt/airflow/dags/test.txt", "output": "/opt/airflow/dags/output-test.txt"}, runner='DirectRunner', py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, ) test_example_beam ``` ### Anything else In addition to modifying the execute_sync() method, I tested if simply installing the Google Cloud SDK onto the airflow worker might somehow fix the error. But for my case that didn't work. If it's determined that a PR for the BeamRunPythonPipelineOperator.execute_sync() method is needed, I'd be happy to take it on! ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
