johannaojeling opened a new issue, #28662: URL: https://github.com/apache/airflow/issues/28662
### Apache Airflow Provider(s) apache-beam ### Versions of Apache Airflow Providers apache-airflow-providers-apache-beam==4.1.0 apache-airflow-providers-google==8.6.0 ### Apache Airflow version 2.5.0 ### Operating System macOS 13.1 ### Deployment Virtualenv installation ### Deployment details _No response_ ### What happened When using the `BeamRunGoPipelineOperator` with a `go_file` on GCS, the object is downloaded to a temporary directory, however the directory with the file has already been removed by the time it is needed, i.e. when executing `go mod init` and starting the pipeline. ### What you think should happen instead The `BeamRunGoPipelineOperator.execute` method enters into a `tempfile.TemporaryDirectory` context manager using [with](https://github.com/apache/airflow/blob/2.5.0/airflow/providers/apache/beam/operators/beam.py#L588) when downloading the `go_file` from GCS to the local filesystem. On completion of the context, this temporary directory is removed. `BeamHook.start_go_pipeline`, which uses the file, is called outside of the context however, which means the file no longer exists when `go mod init` is called. A suggested solution is to use the `enter_context` method of the existing `ExitStack` to also enter into the TemporaryDirectory context manager. This allows the go_file to still exist when it is time to initialize the go module and start the pipeline: ```python with ExitStack() as exit_stack: if self.go_file.lower().startswith("gs://"): gcs_hook = GCSHook(self.gcp_conn_id, self.delegate_to) tmp_dir = exit_stack.enter_context(tempfile.TemporaryDirectory(prefix="apache-beam-go")) tmp_gcs_file = exit_stack.enter_context( gcs_hook.provide_file(object_url=self.go_file, dir=tmp_dir) ) self.go_file = tmp_gcs_file.name self.should_init_go_module = True ``` ### How to reproduce The problem can be reproduced by creating a DAG which uses the `BeamRunGoPipelineOperator` and passing a `go_file` with a GS URI: ```python import pendulum from airflow import DAG from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator with DAG( dag_id="beam_go_dag", start_date=pendulum.today("UTC"), ) as dag: BeamRunGoPipelineOperator( task_id="beam_go_pipeline", go_file="gs://my-bucket/main.go" ) ``` ### Anything else Relevant logs: ``` [2023-01-01T12:41:06.155+0100] {taskinstance.py:1303} INFO - Executing <Task(BeamRunGoPipelineOperator): beam_go_pipeline> on 2023-01-01 00:00:00+00:00 [2023-01-01T12:41:06.411+0100] {taskinstance.py:1510} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=beam_go_dag AIRFLOW_CTX_TASK_ID=beam_go_pipeline AIRFLOW_CTX_EXECUTION_DATE=2023-01-01T00:00:00+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=backfill__2023-01-01T00:00:00+00:00 [2023-01-01T12:41:06.430+0100] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution. [2023-01-01T12:41:06.441+0100] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [2023-01-01T12:41:08.701+0100] {gcs.py:323} INFO - File downloaded to /var/folders/1_/7h5npt456j5f063tq7ngyxdw0000gn/T/apache-beam-gosmk3lv_4/tmp6j9g5090main.go [2023-01-01T12:41:08.704+0100] {process_utils.py:179} INFO - Executing cmd: go mod init main [2023-01-01T12:41:08.712+0100] {taskinstance.py:1782} ERROR - Task failed with exception Traceback (most recent call last): File "/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/google/cloud/hooks/gcs.py", line 402, in provide_file yield tmp_file File "/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/apache/beam/operators/beam.py", line 621, in execute self.beam_hook.start_go_pipeline( File "/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/apache/beam/hooks/beam.py", line 339, in start_go_pipeline init_module("main", working_directory) File "/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/google/go_module_utils.py", line 37, in init_module execute_in_subprocess(go_mod_init_cmd, cwd=go_module_path) File "/Users/johannaojeling/repo/johannaojeling/airflow/airflow/utils/process_utils.py", line 168, in execute_in_subprocess execute_in_subprocess_with_kwargs(cmd, cwd=cwd) File "/Users/johannaojeling/repo/johannaojeling/airflow/airflow/utils/process_utils.py", line 180, in execute_in_subprocess_with_kwargs with subprocess.Popen( File "/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/subprocess.py", line 969, in __init__ self._execute_child(args, executable, preexec_fn, close_fds, File "/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/subprocess.py", line 1845, in _execute_child raise child_exception_type(errno_num, err_msg, err_filename) FileNotFoundError: [Errno 2] No such file or directory: '/var/folders/1_/7h5npt456j5f063tq7ngyxdw0000gn/T/apache-beam-gosmk3lv_4' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/apache/beam/operators/beam.py", line 584, in execute with ExitStack() as exit_stack: File "/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/contextlib.py", line 576, in __exit__ raise exc_details[1] File "/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/contextlib.py", line 561, in __exit__ if cb(*exc_details): File "/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/contextlib.py", line 153, in __exit__ self.gen.throw(typ, value, traceback) File "/Users/johannaojeling/repo/johannaojeling/airflow/airflow/providers/google/cloud/hooks/gcs.py", line 399, in provide_file with NamedTemporaryFile(suffix=file_name, dir=dir) as tmp_file: File "/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/tempfile.py", line 502, in __exit__ self.close() File "/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/tempfile.py", line 509, in close self._closer.close() File "/Users/johannaojeling/.pyenv/versions/3.10.6/lib/python3.10/tempfile.py", line 446, in close unlink(self.name) FileNotFoundError: [Errno 2] No such file or directory: '/var/folders/1_/7h5npt456j5f063tq7ngyxdw0000gn/T/apache-beam-gosmk3lv_4/tmp6j9g5090main.go' [2023-01-01T12:41:08.829+0100] {taskinstance.py:1321} INFO - Marking task as FAILED. dag_id=beam_go_dag, task_id=beam_go_pipeline, execution_date=20230101T000000, start_date=20230101T114106, end_date=20230101T114108 [...] ``` ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
