johannaojeling opened a new issue, #28662:
URL: https://github.com/apache/airflow/issues/28662

   ### Apache Airflow Provider(s)
   
   apache-beam
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-beam==4.1.0
   apache-airflow-providers-google==8.6.0
   
   ### Apache Airflow version
   
   2.5.0
   
   ### Operating System
   
   macOS 13.1
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When using the `BeamRunGoPipelineOperator` with a `go_file` on GCS, the 
object is downloaded to a temporary directory, however the directory with the 
file has already been removed by the time it is needed, i.e. when executing `go 
mod init` and starting the pipeline.
   
   ### What you think should happen instead
   
   The `BeamRunGoPipelineOperator.execute` method enters into a 
`tempfile.TemporaryDirectory` context manager using 
[with](https://github.com/apache/airflow/blob/2.5.0/airflow/providers/apache/beam/operators/beam.py#L588)
 when downloading the `go_file` from GCS to the local filesystem. On completion 
of the context, this temporary directory is removed. 
`BeamHook.start_go_pipeline`, which uses the file, is called outside of the 
context however, which means the file no longer exists when `go mod init` is 
called.
   
   A suggested solution is to use the `enter_context` method of the existing 
`ExitStack` to also enter into the TemporaryDirectory context manager. This 
allows the go_file to still exist when it is time to initialize the go module 
and start the pipeline:
   
   ```python
   with ExitStack() as exit_stack:
       if self.go_file.lower().startswith("gs://"):
           gcs_hook = GCSHook(self.gcp_conn_id, self.delegate_to)
           
           tmp_dir = 
exit_stack.enter_context(tempfile.TemporaryDirectory(prefix="apache-beam-go"))
           tmp_gcs_file = exit_stack.enter_context(
               gcs_hook.provide_file(object_url=self.go_file, dir=tmp_dir)
           )
           
           self.go_file = tmp_gcs_file.name
           self.should_init_go_module = True
   ```
   
   ### How to reproduce
   
   The problem can be reproduced by creating a DAG which uses the 
`BeamRunGoPipelineOperator` and passing a `go_file` with a GS URI:
   
   ```python
   import pendulum
   from airflow import DAG
   from airflow.providers.apache.beam.operators.beam import 
BeamRunGoPipelineOperator
   
   
   with DAG(
       dag_id="beam_go_dag",
       start_date=pendulum.today("UTC"),
   ) as dag:
       BeamRunGoPipelineOperator(
           task_id="beam_go_pipeline",
           go_file="gs://my-bucket/main.go"
       )
   ```
   
   ### Anything else
   
   Relevant logs:
   
   ```
   [2023-01-01T12:41:06.155+0100] {taskinstance.py:1303} INFO - Executing 
<Task(BeamRunGoPipelineOperator): beam_go_pipeline> on 2023-01-01 00:00:00+00:00
   [2023-01-01T12:41:06.411+0100] {taskinstance.py:1510} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=beam_go_dag
   AIRFLOW_CTX_TASK_ID=beam_go_pipeline
   AIRFLOW_CTX_EXECUTION_DATE=2023-01-01T00:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=backfill__2023-01-01T00:00:00+00:00
   [2023-01-01T12:41:06.430+0100] {base.py:73} INFO - Using connection ID 
'google_cloud_default' for task execution.
   [2023-01-01T12:41:06.441+0100] {credentials_provider.py:323} INFO - Getting 
connection using `google.auth.default()` since no key file is defined for hook.
   [2023-01-01T12:41:08.701+0100] {gcs.py:323} INFO - File downloaded to 
/var/folders/1_/7h5npt456j5f063tq7ngyxdw0000gn/T/apache-beam-gosmk3lv_4/tmp6j9g5090main.go
   [2023-01-01T12:41:08.704+0100] {process_utils.py:179} INFO - Executing cmd: 
go mod init main
   [2023-01-01T12:41:08.712+0100] {taskinstance.py:1782} ERROR - Task failed 
with exception
   Traceback (most recent call last):
     File 
"/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/google/cloud/hooks/gcs.py",
 line 402, in provide_file
       yield tmp_file
     File 
"/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/apache/beam/operators/beam.py",
 line 621, in execute
       self.beam_hook.start_go_pipeline(
     File 
"/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/apache/beam/hooks/beam.py",
 line 339, in start_go_pipeline
       init_module("main", working_directory)
     File 
"/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/google/go_module_utils.py",
 line 37, in init_module
       execute_in_subprocess(go_mod_init_cmd, cwd=go_module_path)
     File 
"/Users/johannaojeling/repo/johannaojeling/airflow/airflow/utils/process_utils.py",
 line 168, in execute_in_subprocess
       execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
     File 
"/Users/johannaojeling/repo/johannaojeling/airflow/airflow/utils/process_utils.py",
 line 180, in execute_in_subprocess_with_kwargs
       with subprocess.Popen(
     File 
"/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/subprocess.py", 
line 969, in __init__
       self._execute_child(args, executable, preexec_fn, close_fds,
     File 
"/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/subprocess.py", 
line 1845, in _execute_child
       raise child_exception_type(errno_num, err_msg, err_filename)
   FileNotFoundError: [Errno 2] No such file or directory: 
'/var/folders/1_/7h5npt456j5f063tq7ngyxdw0000gn/T/apache-beam-gosmk3lv_4'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/apache/beam/operators/beam.py",
 line 584, in execute
       with ExitStack() as exit_stack:
     File 
"/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/contextlib.py", 
line 576, in __exit__
       raise exc_details[1]
     File 
"/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/contextlib.py", 
line 561, in __exit__
       if cb(*exc_details):
     File 
"/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/contextlib.py", 
line 153, in __exit__
       self.gen.throw(typ, value, traceback)
     File 
"/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/google/cloud/hooks/gcs.py",
 line 399, in provide_file
       with NamedTemporaryFile(suffix=file_name, dir=dir) as tmp_file:
     File 
"/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/tempfile.py", line 
502, in __exit__
       self.close()
     File 
"/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/tempfile.py", line 
509, in close
       self._closer.close()
     File 
"/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/tempfile.py", line 
446, in close
       unlink(self.name)
   FileNotFoundError: [Errno 2] No such file or directory: 
'/var/folders/1_/7h5npt456j5f063tq7ngyxdw0000gn/T/apache-beam-gosmk3lv_4/tmp6j9g5090main.go'
   [2023-01-01T12:41:08.829+0100] {taskinstance.py:1321} INFO - Marking task as 
FAILED. dag_id=beam_go_dag, task_id=beam_go_pipeline, 
execution_date=20230101T000000, start_date=20230101T114106, 
end_date=20230101T114108
   [...]
   ```
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to