zstrathe opened a new issue, #38713:
URL: https://github.com/apache/airflow/issues/38713

   ### Apache Airflow Provider(s)
   
   apache-beam
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-beam==5.6.2
   
   ### Apache Airflow version
   
   2.8.4
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm) (official Airflow docker image)
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Using docker compose provided for airflow 
(https://airflow.apache.org/docs/apache-airflow/2.8.4/docker-compose.yaml), 
using the base airflow image (apache/airflow:2.8.4) with 
apache-airflow-providers-apache-beam==5.6.2 installed via pip
   
   ### What happened
   
   When attempting to utilize the BeamRunPythonPipelineOperator with a local 
file and DirectRunner (not utilizing Google Cloud services at all), the process 
fails to run, with an exception: **AirflowNotFoundException(f"The conn_id 
{conn_id} isn't defined")**
   
   airflow log:
   ```
   796f08231d0a
   *** Found local files:
   ***   * 
/opt/airflow/logs/dag_id=beam_local_test_dag/run_id=manual__2024-04-03T14:42:24.780689+00:00/task_id=start_python_pipeline_local_direct_runner/attempt=1.log
   [2024-04-03, 09:42:25 CDT] {taskinstance.py:1979} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
beam_local_test_dag.start_python_pipeline_local_direct_runner 
manual__2024-04-03T14:42:24.780689+00:00 [queued]>
   [2024-04-03, 09:42:25 CDT] {taskinstance.py:1979} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
beam_local_test_dag.start_python_pipeline_local_direct_runner 
manual__2024-04-03T14:42:24.780689+00:00 [queued]>
   [2024-04-03, 09:42:25 CDT] {taskinstance.py:2193} INFO - Starting attempt 1 
of 1
   [2024-04-03, 09:42:25 CDT] {taskinstance.py:2217} INFO - Executing 
<Task(BeamRunPythonPipelineOperator): 
start_python_pipeline_local_direct_runner> on 2024-04-03 14:42:24.780689+00:00
   [2024-04-03, 09:42:25 CDT] {standard_task_runner.py:60} INFO - Started 
process 391 to run task
   [2024-04-03, 09:42:25 CDT] {standard_task_runner.py:87} INFO - Running: 
['***', 'tasks', 'run', 'beam_local_test_dag', 
'start_python_pipeline_local_direct_runner', 
'manual__2024-04-03T14:42:24.780689+00:00', '--job-id', '5009', '--raw', 
'--subdir', 'DAGS_FOLDER/test_bugfix.py', '--cfg-path', '/tmp/tmpsctri0q0']
   [2024-04-03, 09:42:25 CDT] {standard_task_runner.py:88} INFO - Job 5009: 
Subtask start_python_pipeline_local_direct_runner
   [2024-04-03, 09:42:25 CDT] {task_command.py:423} INFO - Running 
<TaskInstance: beam_local_test_dag.start_python_pipeline_local_direct_runner 
manual__2024-04-03T14:42:24.780689+00:00 [running]> on host 796f08231d0a
   [2024-04-03, 09:42:25 CDT] {taskinstance.py:2513} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='beam_local_test_dag' 
AIRFLOW_CTX_TASK_ID='start_python_pipeline_local_direct_runner' 
AIRFLOW_CTX_EXECUTION_DATE='2024-04-03T14:42:24.780689+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-03T14:42:24.780689+00:00'
   [2024-04-03, 09:42:25 CDT] {taskinstance.py:2731} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 444, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 414, in _execute_callable
       return execute_callable(context=context, **execute_callable_kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/operators/beam.py",
 line 357, in execute
       return self.execute_sync(context)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/operators/beam.py",
 line 361, in execute_sync
       gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/gcs.py",
 line 159, in __init__
       super().__init__(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py",
 line 251, in __init__
       self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/hooks/base.py", line 
82, in get_connection
       conn = Connection.get_connection_from_secrets(conn_id)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py",
 line 514, in get_connection_from_secrets
       raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
   airflow.exceptions.AirflowNotFoundException: The conn_id 
`google_cloud_default` isn't defined
   [2024-04-03, 09:42:25 CDT] {taskinstance.py:1149} INFO - Marking task as 
FAILED. dag_id=beam_local_test_dag, 
task_id=start_python_pipeline_local_direct_runner, 
execution_date=20240403T144224, start_date=20240403T144225, 
end_date=20240403T144225
   [2024-04-03, 09:42:25 CDT] {standard_task_runner.py:107} ERROR - Failed to 
execute job 5009 for task start_python_pipeline_local_direct_runner (The 
conn_id `google_cloud_default` isn't defined; 391)
   [2024-04-03, 09:42:26 CDT] {local_task_job_runner.py:234} INFO - Task exited 
with return code 1
   [2024-04-03, 09:42:26 CDT] {taskinstance.py:3312} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   ### What you think should happen instead
   
   I believe the issue is that in 
**airflow.providers.apache.beam.operators.beam.py,** the **execute_sync()** 
method of **BeamRunPythonPipelineOperator** calls the google cloud hook 
(GCSHook) unnecessarily when local file paths are provided:
   
   
   ```
   def execute_sync(self, context: Context):
           with ExitStack() as exit_stack:
               gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
               if self.py_file.lower().startswith("gs://"):
                   tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
                   self.py_file = tmp_gcs_file.name
               if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://"):
                   tmp_req_file = exit_stack.enter_context(
                       
gcs_hook.provide_file(object_url=self.snake_case_pipeline_options["requirements_file"])
                   )
                   self.snake_case_pipeline_options["requirements_file"] = 
tmp_req_file.name
   ```
   
   
   
   I believe that this method should be modified to only call GCSHook when 
necessary (when the specified py_file or requirements_file starts with "gs://"):
   
   ```
   def execute_sync(self, context: Context):
           with ExitStack() as exit_stack:
               if self.py_file.lower().startswith("gs://"):
                   gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
                   tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
                   self.py_file = tmp_gcs_file.name
               if self.snake_case_pipeline_options.get("requirements_file", 
"").startswith("gs://"):
                   if 'gcs_hook' not in locals():
                       gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)    
                   tmp_req_file = exit_stack.enter_context(
                       
gcs_hook.provide_file(object_url=self.snake_case_pipeline_options["requirements_file"])
                   )
                   self.snake_case_pipeline_options["requirements_file"] = 
tmp_req_file.name
   ```
   
   In my own testing this works. However, I do not use and have not tested any 
Google Cloud features.
   
   
   ### How to reproduce
   
   Using the following example DAG, with a local text file as an input for 
testing, located in '/opt/airflow/dags/' folder in airflow docker container
   
   ```
   import logging
   from datetime import datetime
   from airflow import DAG
   from airflow.providers.apache.beam.operators.beam import 
BeamRunPythonPipelineOperator
   
   logger = logging.getLogger("airflow.task")
   
   with DAG(
       dag_id='beam_local_test_dag',
       start_date=datetime(2024, 4, 3),
       schedule=None
       ) as dag:
   
       test_example_beam = BeamRunPythonPipelineOperator(
           task_id="start_python_pipeline_local_direct_runner",
           py_file="apache_beam.examples.wordcount", 
           py_options=['-m'],
           pipeline_options={"input": "/opt/airflow/dags/test.txt", "output": 
"/opt/airflow/dags/output-test.txt"},
           runner='DirectRunner',
           py_requirements=["apache-beam[gcp]==2.46.0"],
           py_interpreter="python3",
           py_system_site_packages=False,
       )
   
       test_example_beam
   ```
   
   ### Anything else
   
   In addition to modifying the execute_sync() method, I tested if simply 
installing the Google Cloud SDK onto the airflow worker might somehow fix the 
error. But for my case that didn't work.
   
   If it's determined that a PR for the 
BeamRunPythonPipelineOperator.execute_sync() method is needed, I'd be happy to 
take it on!
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to