emilymye commented on a change in pull request #13730:
URL: https://github.com/apache/beam/pull/13730#discussion_r556019987
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -683,18 +683,41 @@ def create_job(self, job):
'A template was just created at location %s', template_location)
return None
+ @staticmethod
+ def _update_container_image_for_dataflow(beam_container_image_url):
+ image_suffix = beam_container_image_url.rsplit('/', 1)[1]
Review comment:
Can we add a log line here to indicate we are doing this override? In
general I'm a big fan of logs for the environment setup.
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -560,8 +560,8 @@ def __init__(self, options):
def _get_sdk_image_overrides(self, pipeline_options):
worker_options = pipeline_options.view_as(WorkerOptions)
sdk_overrides = worker_options.sdk_harness_container_image_overrides
- if sdk_overrides:
- return dict(override_str.split(',', 1) for override_str in sdk_overrides)
+ return dict(override_str.split(',', 1)
Review comment:
nit - style guide-wise I'd probably do one of the following instead:
```suggestion
if sdk_overrides:
return dict(s.split(',', 1) for s in sdk_overrides)
return dict()
```
```suggestion
return (dict(s.split(',', 1) for s in sdk_overrides)
if sdk_overrides
else dict())
```
I also don't think we need to be explicit about the override_str var name
(which would make this shorter), but that might be the go style jumping out
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -687,18 +687,41 @@ def create_job(self, job):
'A template was just created at location %s', template_location)
return None
+ @staticmethod
+ def _update_container_image_for_dataflow(beam_container_image_url):
+ image_suffix = beam_container_image_url.rsplit('/', 1)[1]
+ return names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + image_suffix
+
@staticmethod
def _apply_sdk_environment_overrides(proto_pipeline, sdk_overrides):
- # Update environments based on user provided overrides
- if sdk_overrides:
- for environment in proto_pipeline.components.environments.values():
- docker_payload = proto_utils.parse_Bytes(
- environment.payload, beam_runner_api_pb2.DockerPayload)
- for pattern, override in sdk_overrides.items():
- new_payload = copy(docker_payload)
- new_payload.container_image = re.sub(
- pattern, override, docker_payload.container_image)
- environment.payload = new_payload.SerializeToString()
+ # Updates container image URLs for Dataflow.
Review comment:
I just prefer the sdk_overrides check here a bit. Even if we sanitize
the output in the helper method, it doesn't guarantee that a method that calls
this will do the same.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]