ibzib commented on a change in pull request #15105:
URL: https://github.com/apache/beam/pull/15105#discussion_r661884341



##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -526,6 +534,14 @@ def _create_jar_packages(jar_packages, temp_dir):
 
     return resources
 
+  @staticmethod
+  def _should_download_remote_extra_packages(package):

Review comment:
       I think maybe the best policy is to just never download remote packages 
here. If the user wanted to download packages, I figure they could just do it 
themselves and pass them in as local files. WDYT?

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -569,12 +586,17 @@ def _create_extra_packages(extra_packages, temp_dir):
 
       if not os.path.isfile(package):
         if Stager._is_remote_path(package):
-          # Download remote package.
-          _LOGGER.info(
-              'Downloading extra package: %s locally before staging', package)
-          _, last_component = FileSystems.split(package)
-          local_file_path = FileSystems.join(staging_temp_dir, last_component)
-          Stager._download_file(package, local_file_path)
+          if Stager._should_download_remote_extra_packages(package):
+            # Download remote package.
+            _LOGGER.info(
+                'Downloading extra package: %s locally before staging', 
package)
+            _, last_component = FileSystems.split(package)
+            local_file_path = FileSystems.join(staging_temp_dir, 
last_component)
+            Stager._download_file(package, local_file_path)
+          else:
+            remote_packages.append(package)
+            _LOGGER.info(
+                'Deferring download of extra package: %s to workers', package)

Review comment:
       This is potentially a breaking change for some folks (if their retrieval 
service doesn't have the right networking/permissions but their stager does) so 
it's worth documenting in the release notes 
(https://github.com/apache/beam/blob/master/CHANGES.md).

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service_test.py
##########
@@ -122,6 +124,24 @@ def test_url_retrieval(self):
     with open(__file__, 'rb') as fin:
       self.assertEqual(content, fin.read())
 
+  def test_gcs_retrieval(self):
+    retrieval_service = artifact_service.ArtifactRetrievalService(None)
+    url_dep = beam_runner_api_pb2.ArtifactInformation(
+        type_urn=common_urns.artifact_types.URL.urn,
+        type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
+            url='gs://test_gcs_retrieval').SerializeToString())
+    with mock.patch('apache_beam.runners.portability.artifact_service.'
+                    'GCSFileSystem.open') as mock_open:
+      mock_read_handle = mock.Mock()
+      mock_read_handle.read.return_value = b''
+      mock_open.return_value = mock_read_handle
+      content = b''.join([

Review comment:
       Nit: `content` variable is unused.

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service.py
##########
@@ -77,8 +79,10 @@ def GetArtifact(self, request, context=None):
     elif request.artifact.type_urn == common_urns.artifact_types.URL.urn:
       payload = proto_utils.parse_Bytes(
           request.artifact.type_payload, 
beam_runner_api_pb2.ArtifactUrlPayload)
-      # TODO(Py3): Remove the unneeded contextlib wrapper.
-      read_handle = contextlib.closing(urlopen(payload.url))
+      if FileSystems.get_scheme(payload.url) == GCSFileSystem.scheme():

Review comment:
       Can we generalize this to any Beam filesystem (perhaps using 
`FileSystems.open`)? 
https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.io.filesystems.html#apache_beam.io.filesystems.FileSystems.open

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service_test.py
##########
@@ -122,6 +124,24 @@ def test_url_retrieval(self):
     with open(__file__, 'rb') as fin:
       self.assertEqual(content, fin.read())
 
+  def test_gcs_retrieval(self):
+    retrieval_service = artifact_service.ArtifactRetrievalService(None)
+    url_dep = beam_runner_api_pb2.ArtifactInformation(
+        type_urn=common_urns.artifact_types.URL.urn,
+        type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
+            url='gs://test_gcs_retrieval').SerializeToString())

Review comment:
       Nit: though it must not matter here, `gs://test_gcs_retrieval` is a 
bucket, not a valid GCS object path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to