hussein-awala commented on code in PR #31645:
URL: https://github.com/apache/airflow/pull/31645#discussion_r1211877996


##########
airflow/providers/apache/beam/operators/beam.py:
##########
@@ -294,10 +294,15 @@ def execute(self, context: Context):
             raise AirflowException("Beam hook is not defined.")
 
         with ExitStack() as exit_stack:
-            if self.py_file.lower().startswith("gs://"):
+                
+            if self.py_file.lower().startswith("gs://") or 
snake_case_pipeline_options.get('requirements_file', '').startswith("gs://"):
                 gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
                 tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
                 self.py_file = tmp_gcs_file.name
+                if snake_case_pipeline_options.get('requirements_file', 
'').startswith("gs://"):
+                    tmp_req_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=snake_case_pipeline_options['requirements_file']))
+                    snake_case_pipeline_options['requirements_file'] = 
tmp_req_file.name                              
+        

Review Comment:
   can we have a py_file which doesn't start with `gs://` with a requirement 
file stored in gcs in the same job? if yes (and I think it's yes), your code 
will try to download the two files from gcs.
   
   Instead, you can create a method to download an object from gcs, and use it 
for py_file and/or requirements file:
   ```python
   def _get_file_from_gcs(self, object_url):
       gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
       return 
exit_stack.enter_context(gcs_hook.provide_file(object_url=object_url))
   ```
   and in this method:
   ```python
   if self.py_file.lower().startswith("gs://"):
       self.py_file = self._get_file_from_gcs(self.py_file).name
   if snake_case_pipeline_options.get('requirements_file', 
'').startswith("gs://"):
       snake_case_pipeline_options['requirements_file'] = 
self._get_file_from_gcs(snake_case_pipeline_options['requirements_file']).name
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to