ihji commented on a change in pull request #14491:
URL: https://github.com/apache/beam/pull/14491#discussion_r621709973



##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -588,11 +588,25 @@ def _stage_resources(self, pipeline, options):
               'Found duplicated artifact: %s (%s)',
               type_payload.path,
               type_payload.sha256)
+          staged_name = hashs[type_payload.sha256]
           dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
-              staged_name=hashs[type_payload.sha256]).SerializeToString()
+              staged_name=staged_name).SerializeToString()
         else:
-          resources.append((type_payload.path, role_payload.staged_name))
-          hashs[type_payload.sha256] = role_payload.staged_name
+          staged_name = role_payload.staged_name
+          resources.append((type_payload.path, staged_name))
+          hashs[type_payload.sha256] = staged_name
+
+        if google_cloud_options.staging_location.startswith('gs://'):
+          dep.type_urn = common_urns.artifact_types.URL.urn
+          dep.type_payload = beam_runner_api_pb2.ArtifactUrlPayload(
+              url=FileSystems.join(
+                  google_cloud_options.staging_location,
+                  staged_name)).SerializeToString()
+        else:
+          dep.type_payload = beam_runner_api_pb2.ArtifactFilePayload(

Review comment:
       AFAIK, ArtifactFilePayload is used for the artifact on the local disk 
meaning that network connection is not necessary to access the artifact. 
ArtifactUrlPayload is used when we need to download the artifact in order to 
access it.

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -588,11 +588,25 @@ def _stage_resources(self, pipeline, options):
               'Found duplicated artifact: %s (%s)',
               type_payload.path,
               type_payload.sha256)
+          staged_name = hashs[type_payload.sha256]
           dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
-              staged_name=hashs[type_payload.sha256]).SerializeToString()
+              staged_name=staged_name).SerializeToString()
         else:
-          resources.append((type_payload.path, role_payload.staged_name))
-          hashs[type_payload.sha256] = role_payload.staged_name
+          staged_name = role_payload.staged_name
+          resources.append((type_payload.path, staged_name))
+          hashs[type_payload.sha256] = staged_name
+
+        if google_cloud_options.staging_location.startswith('gs://'):

Review comment:
       Yes, but still we can't blindly update the artifact type to URL artifact 
(for example, Dataflow local runner uses a local directory as a staging 
location).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to