On Thu, Feb 4, 2021 at 3:33 PM Kyle Weaver <kcwea...@google.com> wrote:

>  This gets into the distinction of customizing what kind of environment
>> one wants to have (which could be generally applicable) vs. an absolute
>> designation of a particular environment (e.g. a docker image).
>
>
> For common environment modifications, resource hints are a great idea,
> since it's much easier to set an annotation than to build and set a custom
> container. The limitation of this approach is we can't handle every
> possible modification a user might want to make to their environment.
> Custom containers give the user ultimate control over the environment, so
> we forfeit a lot of flexibility if we don't provide enough options to use
> them.
>
> Note that what we're running into in part is that "pipeline options" are
>> the wrong level of granularity for specifying characteristics of an
>> environment, as there is not a single environment to parameterize (or,
>> possibly, even one per language).
>
>
> Yes, this is the crux of the problem. We already expose an
> environment_config as a pipeline option, so we basically have three choices:
> 1. Deprecate pipeline-level environment options altogether.
> 2. Find a way to generalize environment options.
> 3. Keep and document the status quo (ie users can use custom containers,
> but at most only one per language).
>

I do think it can be useful to specify a custom "top-level" environment. We
should probably make it easy to use customized expansion services.


> The caller should not need any visibility into the environment(s) that an
>> expansion service uses, which is an implementation detail that the
>> expansion service is free to change at any time. (In fact, whether it is
>> (partially or fully) implemented as an external transform is an
>> implementation detail that the end user should not need to care about or
>> depend on.)
>
>
> I personally think pattern matching and substitution by runners (maybe
>> more sophisticated than regexp on container names) is a reasonable way to
>> approach customization of environments.
>
>
> Aren't these ideas contradictory? Pattern matching requires knowledge in
> advance of which patterns to match. We'd need to know at least some
> information about the environment the expansion service is expected to use
> in order to replace it.
>

The pattern matching is not such that I want to replace the environment for
this particular transform, but that /if/ I see a Java environment of a
certain type /then/ I want to run it in this way.


> For example, suppose I construct a pipeline that uses both Python and Java
>> transforms. (I could do this from Go, Java, or Python). If I want to run
>> this locally (e.g. on the Python FnAPI runner), I would prefer that the
>> python bits be run in-process but would have to shell out (maybe via
>> docker, maybe something cheaper) for the java bits. On the other hand, if I
>> want to run this same pipeline (ideally, the same model proto, such that we
>> don't have runner-dependent construction) on Flink, I might want the java
>> bits to be inlined and the Python bits to be in a separate process. On
>> Dataflow, both would live in containers. To do this, the Python runner
>> would say "hey, I know that Python environment" and just swap it out for
>> in-process, and vice versa. (For isolation/other reasons, one may want the
>> option to force everything to be docker, but that's more of a "don't make
>> substitutions" option than manually providing environment configs.)
>
>
> In this example, wouldn't you normally just rebuild the pipeline? I'm not
> sure what the advantage of re-using the same model proto is.
>

Yes, you'd re-build the pipeline. But if all you change is the --runner
flag the model proto produced should not change. (And, sometimes, you may
want to stash the proto itself, or pass it to one-of-N runners depending on
some other condition, etc.)


>  It would be helpful for me to have concrete usecases of why a user wants
>> to customize the container used by some transform they did not write, which
>> could possibly inform the best course(s) of action here.
>
>
> I should have led with this. Someone wanted to mount credentials into the
> SDK harness [1]. So in this particular case the user just wants to mount
> files into their SDK harness, which is a pretty common use case, so
> resource hints are probably a more appropriate solution.
>
> [1]
> https://lists.apache.org/thread.html/r690094f1c9ebc4e1d20f029a21ba8bc846672a65baafd57c4f52cb94%40%3Cuser.beam.apache.org%3E
>

Ah, that clarifies things. Would it be possible/preferable to pass the
credentials as parameters to the transform itself?


>
>
> On Thu, Feb 4, 2021 at 1:51 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> On Thu, Feb 4, 2021 at 12:38 PM Kyle Weaver <kcwea...@google.com> wrote:
>>
>>> So, an external transform is uniquely identified by its URN. An external
>>>> transform identified by a URN may refer to an arbitrary composite which may
>>>> have sub-transforms that refer to different environments. I think with the
>>>> above proposal we'll lose this flexibility.
>>>> What we need is a way to override environments (or properties of
>>>> environments) that results in the final pipeline proto. Once we modify such
>>>> environments in the proto it will be reflected to all transforms that
>>>> utilize such environments.
>>>
>>>
>>> As far as I can tell we currently only register a single environment for
>>> the entire transform (and it's always the default). Am I missing something?
>>> https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449
>>>
>>> Anyway, I don't see how sub-transforms require overrides. We should be
>>> able to propagate environment options to sub-transforms to achieve the same
>>> purpose.
>>>
>>
>> The discussion of resource hints at
>> https://lists.apache.org/thread.html/ra40286b66a03a1d9f4086c9e1ecdeb9f299836d2d0361c3e3fe7c382%40%3Cdev.beam.apache.org%3E
>> actually may tie into this as well. I would assume a localised request for,
>> say, high memory should be propagated down to cross-language pipelines. It
>> is possible that other customizations (such as making sure specific
>> dependencies are available, or filesystems mounted) would fit here too.
>>
>> This gets into the distinction of customizing what kind of environment
>> one wants to have (which could be generally applicable) vs. an absolute
>> designation of a particular environment (e.g. a docker image).
>>
>> Note that what we're running into in part is that "pipeline options" are
>> the wrong level of granularity for specifying characteristics of an
>> environment, as there is not a single environment to parameterize (or,
>> possibly, even one per language). If I call
>> ExpansionRequset(MyFancyTransform,environment_config=docker_path)
>> and MyFancyTransform is composed of two environments, to which
>> does docker_path apply? What about PTransforms that use ExternalTransforms
>> under the hood (e.g does some pre-processing and then calls SQL, or calls
>> Kafka followed by some Python-level post-processing)?
>>
>>
>> 'sdk_harness_container_image_overrides' is such a property (which
>>>> unfortunately only works for Dataflow today). Also this only works for
>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>> a new property that works for all types of environments ?
>>>
>>>
>>> In my original email, I wrote that sdk_harness_container_image_overrides
>>> is no more flexible than having a single option per SDK, since the default
>>> container images for all external transforms in each SDK are expected to be
>>> the same. For example, in the case of a pipeline with two external
>>> transforms that both use the same default container image,
>>> sdk_harness_container_image_overrides does not let the user give those two
>>> transforms different containers.
>>>
>>> From a design standpoint, I feel find-replace is hacky and backwards.
>>> It's cleaner to specify what kind of environment we want directly in
>>> the ExpansionRequest. That way all of the environment creation logic
>>> belongs inside the expansion service.
>>>
>>
>> While Environments logically belong with Transforms, it is the expansion
>> service's job to attach the right environments to the transforms that it
>> vends. The caller should not need any visibility into the environment(s)
>> that an expansion service uses, which is an implementation detail that the
>> expansion service is free to change at any time. (In fact, whether it is
>> (partially or fully) implemented as an external transform is an
>> implementation detail that the end user should not need to care about or
>> depend on.)
>>
>> I personally think pattern matching and substitution by runners (maybe
>> more sophisticated than regexp on container names) is a reasonable way to
>> approach customization of environments. For example, suppose I construct a
>> pipeline that uses both Python and Java transforms. (I could do this from
>> Go, Java, or Python). If I want to run this locally (e.g. on the Python
>> FnAPI runner), I would prefer that the python bits be run in-process but
>> would have to shell out (maybe via docker, maybe something cheaper) for the
>> java bits. On the other hand, if I want to run this same pipeline (ideally,
>> the same model proto, such that we don't have
>> runner-dependent construction) on Flink, I might want the java bits to be
>> inlined and the Python bits to be in a separate process. On Dataflow, both
>> would live in containers. To do this, the Python runner would say "hey, I
>> know that Python environment" and just swap it out for in-process, and vice
>> versa. (For isolation/other reasons, one may want the option to force
>> everything to be docker, but that's more of a "don't make substitutions"
>> option than manually providing environment configs.)
>>
>> On the other hand, as we go the route of custom containers, especially
>> expansion services that might vend custom containers, I think we need a way
>> to push down *properties* of environments (such as resource hints) through
>> the expansion service that may influence the environments that get attached
>> and returned.
>>
>> It would be helpful for me to have concrete usecases of why a user wants
>> to customize the container used by some transform they did not write, which
>> could possibly inform the best course(s) of action here.
>>
>>
>>
>>>
>>>
>>> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <kcwea...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Beamers,
>>>>>
>>>>> Recently we’ve had some requests on user@ and Slack for instructions
>>>>> on how to use custom-built containers in cross-language pipelines
>>>>> (typically calling Java transforms from a predominantly Python pipeline).
>>>>> Currently, it seems like there is no way to change the container used by a
>>>>> cross-language transform except by modifying and rebuilding the expansion
>>>>> service. The SDK does not pass pipeline options to the expansion service
>>>>> (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
>>>>> if pipeline options are passed, the existing set of pipeline options still
>>>>> limits the amount of control we have over environments. Here are the
>>>>> existing pipeline options that I’m aware of:
>>>>>
>>>>> Python [2] and Go [3] have these:
>>>>>
>>>>>    -
>>>>>
>>>>>    environment_type (DOCKER, PROCESS, LOOPBACK)
>>>>>    -
>>>>>
>>>>>    environment_config (This one is confusingly overloaded. It’s a
>>>>>    string that means different things depending on environment_type. For
>>>>>    DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. For
>>>>>    EXTERNAL, it is the external service address.)
>>>>>
>>>>>
>>>>> Whereas Java [4] has defaultEnvironmentType and
>>>>> defaultEnvironmentConfig, which are named differently but otherwise act 
>>>>> the
>>>>> same as the above.
>>>>>
>>>>> I was unsatisfied with environment_config for a number of reasons.
>>>>> First, having a single overloaded option that can mean entirely different
>>>>> things depending on context is poor design. Second, in PROCESS mode,
>>>>> requiring the user to type in a JSON blob for environment_config is not
>>>>> especially human-friendly (though it has also been argued that JSON makes
>>>>> complex arguments like this easier to parse). Finally, we must overload
>>>>> this string further to introduce new environment-specific options, such as
>>>>> a mounted Docker volume (BEAM-5440 [5]).
>>>>>
>>>>
>>>> Agree.
>>>>
>>>>
>>>>>
>>>>> To address these problems, I added a new option called
>>>>> “environment_options” (BEAM-10671 [6]). (This option has been implemented
>>>>> in the Python SDK, but not the other SDKs yet.) Environment_options,
>>>>> similar to the “experiments” option, takes a list of strings, for example
>>>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It could
>>>>> be argued we should have made “docker_container_image” etc. top-level
>>>>> options instead, but this “catch-all” design makes what I am about to
>>>>> propose a lot easier.
>>>>>
>>>>> The solution proposed in PR #11638 [7] set a flag to include
>>>>> unrecognized pipeline options during serialization, since otherwise
>>>>> unrecognized options are dropped. In a Python pipeline, this will allow us
>>>>> to set environment_config and default_environment_config to separate
>>>>> values, for Python and Java containers, respectively. However, this still
>>>>> limits us to one container image for all Python and Go transforms, and one
>>>>> container image for all Java transforms. As more cross-language transforms
>>>>> are implemented, sooner or later someone will want to have different Java
>>>>> SDK containers for different external transforms.
>>>>>
>>>>> (I should also mention the sdk_harness_container_image_overrides
>>>>> pipeline option [8], which is currently only supported by the Dataflow
>>>>> runner. It lets us basically perform a find/replace on container image
>>>>> strings. This is not significantly more flexible than having a single
>>>>> option per SDK, since the default container images for all external
>>>>> transforms in each SDK are expected to be the same.)
>>>>>
>>>>> Environments logically belong with transforms, and that’s how it works
>>>>> in the Runner API [9]. The problem now is that from the user’s 
>>>>> perspective,
>>>>> the environment is bound to the expansion service. After addressing
>>>>> BEAM-9449, the problem will be that one or two environments at most are
>>>>> bound to the pipeline. Ideally, though, users should have fully granular
>>>>> control over environments at the transform level.
>>>>>
>>>>> All this context for a very simple proposal: we should have all
>>>>> ExternalTransform subclasses take optional environment_type and
>>>>> environment_options fields in their constructors. As with their
>>>>> corresponding pipeline options, these options would default to DOCKER and
>>>>> none, respectively. Then we could overwrite the environment_type and
>>>>> environment_options in the pipeline options passed to the expansion 
>>>>> service
>>>>> with these values. (Alternatively, we could pass environment_type and
>>>>> environment_options to the expansion service individually to avoid having
>>>>> to overwrite their original values, but their original values should be
>>>>> irrelevant to the expansion service anyway.)
>>>>>
>>>>> What do you think?
>>>>>
>>>>
>>>> So, an external transform is uniquely identified by its URN. An
>>>> external transform identified by a URN may refer to an arbitrary composite
>>>> which may have sub-transforms that refer to different environments. I think
>>>> with the above proposal we'll lose this flexibility.
>>>> What we need is a way to override environments (or properties of
>>>> environments) that results in the final pipeline proto. Once we modify such
>>>> environments in the proto it will be reflected to all transforms that
>>>> utilize such environments.
>>>>
>>>> 'sdk_harness_container_image_overrides' is such a property (which
>>>> unfortunately only works for Dataflow today). Also this only works for
>>>> Docker URLs. Maybe we can extend this property to all runners or introduce
>>>> a new property that works for all types of environments ?
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/BEAM-9449
>>>>>
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115
>>>>>
>>>>> [3]
>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53
>>>>>
>>>>> [4]
>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71
>>>>>
>>>>> [5] https://issues.apache.org/jira/browse/BEAM-5440
>>>>>
>>>>> [6] https://issues.apache.org/jira/browse/BEAM-10671
>>>>>
>>>>> [7] https://github.com/apache/beam/pull/11638
>>>>>
>>>>> [8]
>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850
>>>>>
>>>>> [9]
>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>>>>>
>>>>>

Reply via email to