hussein-awala commented on code in PR #31645:
URL: https://github.com/apache/airflow/pull/31645#discussion_r1211877996
##########
airflow/providers/apache/beam/operators/beam.py:
##########
@@ -294,10 +294,15 @@ def execute(self, context: Context):
raise AirflowException("Beam hook is not defined.")
with ExitStack() as exit_stack:
- if self.py_file.lower().startswith("gs://"):
+
+ if self.py_file.lower().startswith("gs://") or
snake_case_pipeline_options.get('requirements_file', '').startswith("gs://"):
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
tmp_gcs_file =
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
self.py_file = tmp_gcs_file.name
+ if snake_case_pipeline_options.get('requirements_file',
'').startswith("gs://"):
+ tmp_req_file =
exit_stack.enter_context(gcs_hook.provide_file(object_url=snake_case_pipeline_options['requirements_file']))
+ snake_case_pipeline_options['requirements_file'] =
tmp_req_file.name
+
Review Comment:
can we have a py_file which doesn't start with `gs://` with a requirement
file stored in gcs in the same job? if yes (and I think it's yes), your code
will try to download the two files from gcs.
Instead, you can create a method to download an object from gcs, and use it
for py_file and/or requirements file:
```python
def _get_file_from_gcs(self, object_url):
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
return
exit_stack.enter_context(gcs_hook.provide_file(object_url=object_url))
```
and in this method:
```python
if self.py_file.lower().startswith("gs://"):
self.py_file = self._get_file_from_gcs(self.py_file).name
if snake_case_pipeline_options.get('requirements_file',
'').startswith("gs://"):
snake_case_pipeline_options['requirements_file'] =
self._get_file_from_gcs(snake_case_pipeline_options['requirements_file']).name
```
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]