ihji commented on a change in pull request #14491:
URL: https://github.com/apache/beam/pull/14491#discussion_r621709973
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -588,11 +588,25 @@ def _stage_resources(self, pipeline, options):
'Found duplicated artifact: %s (%s)',
type_payload.path,
type_payload.sha256)
+ staged_name = hashs[type_payload.sha256]
dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
- staged_name=hashs[type_payload.sha256]).SerializeToString()
+ staged_name=staged_name).SerializeToString()
else:
- resources.append((type_payload.path, role_payload.staged_name))
- hashs[type_payload.sha256] = role_payload.staged_name
+ staged_name = role_payload.staged_name
+ resources.append((type_payload.path, staged_name))
+ hashs[type_payload.sha256] = staged_name
+
+ if google_cloud_options.staging_location.startswith('gs://'):
+ dep.type_urn = common_urns.artifact_types.URL.urn
+ dep.type_payload = beam_runner_api_pb2.ArtifactUrlPayload(
+ url=FileSystems.join(
+ google_cloud_options.staging_location,
+ staged_name)).SerializeToString()
+ else:
+ dep.type_payload = beam_runner_api_pb2.ArtifactFilePayload(
Review comment:
AFAIK, ArtifactFilePayload is used for the artifact on the local disk
meaning that network connection is not necessary to access the artifact.
ArtifactUrlPayload is used when we need to download the artifact in order to
access it.
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -588,11 +588,25 @@ def _stage_resources(self, pipeline, options):
'Found duplicated artifact: %s (%s)',
type_payload.path,
type_payload.sha256)
+ staged_name = hashs[type_payload.sha256]
dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
- staged_name=hashs[type_payload.sha256]).SerializeToString()
+ staged_name=staged_name).SerializeToString()
else:
- resources.append((type_payload.path, role_payload.staged_name))
- hashs[type_payload.sha256] = role_payload.staged_name
+ staged_name = role_payload.staged_name
+ resources.append((type_payload.path, staged_name))
+ hashs[type_payload.sha256] = staged_name
+
+ if google_cloud_options.staging_location.startswith('gs://'):
Review comment:
Yes, but still we can't blindly update the artifact type to URL artifact
(for example, Dataflow local runner uses a local directory as a staging
location).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]