This is an automated email from the ASF dual-hosted git repository. bhulette pushed a commit to branch release-2.22.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.22.0 by this push: new ed016e3 [BEAM-10052] check hash and avoid duplicated artifacts (#11843) ed016e3 is described below commit ed016e3ebf4abb1a26367de1507501a5bf07c902 Author: Chamikara Jayalath <chamik...@apache.org> AuthorDate: Thu May 28 11:05:09 2020 -0700 [BEAM-10052] check hash and avoid duplicated artifacts (#11843) Co-authored-by: Heejong Lee <heej...@gmail.com> --- .../apache_beam/runners/dataflow/internal/apiclient.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 06bb260..55649a7 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -583,6 +583,7 @@ class DataflowApplicationClient(object): raise RuntimeError('The --temp_location option must be specified.') resources = [] + hashs = {} for _, env in sorted(pipeline.components.environments.items(), key=lambda kv: kv[0]): for dep in env.dependencies: @@ -595,7 +596,16 @@ class DataflowApplicationClient(object): role_payload = ( beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString( dep.role_payload)) - resources.append((type_payload.path, role_payload.staged_name)) + if type_payload.sha256 and type_payload.sha256 in hashs: + _LOGGER.info( + 'Found duplicated artifact: %s (%s)', + type_payload.path, + type_payload.sha256) + dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload( + staged_name=hashs[type_payload.sha256]).SerializeToString() + else: + resources.append((type_payload.path, role_payload.staged_name)) + hashs[type_payload.sha256] = role_payload.staged_name resource_stager = _LegacyDataflowStager(self) staged_resources = resource_stager.stage_job_resources(