tvalentyn commented on a change in pull request #14189:
URL: https://github.com/apache/beam/pull/14189#discussion_r600933018
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -282,45 +283,26 @@ def __init__(
# Dataflow workers.
environments_to_use = self._get_environments_from_tranforms()
if _use_unified_worker(options):
- # Adding a SDK container image for the pipeline SDKs
- container_image = dataflow.SdkHarnessContainerImage()
- pipeline_sdk_container_image = get_container_image_from_options(options)
- container_image.containerImage = pipeline_sdk_container_image
- container_image.useSingleCorePerContainer = True # True for Python SDK.
- pool.sdkHarnessContainerImages.append(container_image)
-
- already_added_containers = [pipeline_sdk_container_image]
+ python_sdk_container_image = get_container_image_from_options(options)
# Adding container images for other SDKs that may be needed for
# cross-language pipelines.
- for environment in environments_to_use:
+ for id, environment in environments_to_use:
if environment.urn != common_urns.environments.DOCKER.urn:
raise Exception(
'Dataflow can only execute pipeline steps in Docker
environments.'
' Received %r.' % environment)
environment_payload = proto_utils.parse_Bytes(
environment.payload, beam_runner_api_pb2.DockerPayload)
container_image_url = environment_payload.container_image
- if container_image_url in already_added_containers:
- # Do not add the pipeline environment again.
-
- # Currently, Dataflow uses Docker container images to uniquely
- # identify execution environments. Hence Dataflow executes all
- # transforms that specify the the same Docker container image in a
- # single container instance. Dependencies of all environments that
- # specify a given container image will be staged in the container
- # instance for that particular container image.
- # TODO(BEAM-9455): loosen this restriction to support multiple
- # environments with the same container image when Dataflow supports
- # environment specific artifact provisioning.
- continue
- already_added_containers.append(container_image_url)
container_image = dataflow.SdkHarnessContainerImage()
container_image.containerImage = container_image_url
# Currently we only set following to True for Python SDK.
# TODO: set this correctly for remote environments that might be
Python.
- container_image.useSingleCorePerContainer = False
+ container_image.useSingleCorePerContainer = (
+ container_image_url == python_sdk_container_image)
Review comment:
Can you please file a bug to follow up on this? it sounds like it may be
a blocker prebuilding workflow with portable job submission. cc: @y1chi - do
you know if we verified this scenario? We need to make sure that prebuilding
flow might does not interfere with launch 1-container-per-core Python behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]