On Tue, Jun 29, 2021 at 2:39 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
On 6/29/21 11:04 PM, Robert Bradshaw wrote:
> You can configure the environment in the current state, you
just have
> to run your own expansion service that has a different
environment
> backed into it (or, makes this configurable).
Yes, that is true. On the other hand that lacks some
user-friendliness,
because ideally, you don't want to worry about expansion
services,
mostly when it comes to some mostly standard IO. The ideal
case is that
you either do not basically know that you use external
transform (which
is probably the case when you can use docker), or you are
able to
overcome the problem within the SDK (Python) by passing some
argument to
the input transform.
Arguments passed to the pipeline level apply to the whole
pipeline (not just one transform). So if you pass in a default
environment (and configs) at pipeline level, that would mean the
default environment and configs used by the pipeline (so Python
SDK in this case) not a specific transform.
I believe we have made usage of external transforms used-friendly
for the general case. But we had to make some assumptions. For
example we assumed,
* user will be using the default environment of the expansion
service (Docker in this case)
* User will be using the pre-specified dependency only
(sdks:java:io:expansion-service:shadowJar for Kafka)
* User will be in an environment where the jar can be downloaded.
I would consider any use-case where these basic assumptions
cannot be met as an advanced use-case. The solution in such a
case would be to start a custom expansion service and pass the
address of it as a parameter to the transform [1]. I'm fine with
extending the capabilities of Java expansion service by adding
more parameters (for example, for overriding the environment, for
specifying dependencies, for providing pipeline options).
Thanks,
Cham
[1]
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
>
> Is option (1) updating the default expansion service such
that one can
> override default environment properties on the command
line? (You
> would still have to start it up manually to use it.)
Yes and no. :) Updating ExpansionService so that you can
specify default
environment on command like makes this accessible to
JavaJarExpansionService, and that makes it possible to add
(optional)
argument to Python Kafka IO, that would delegate this to the
(automatically) started expansion service. It is important to
note that
both ReadFromKafka and WriteToKafka have expansion that
involves only
single external (Java) SDK. That simplifies things.
>
> Maybe it would help to make things more concrete. Suppose I
have a Go
> pipeline that uses a library which invokes a Python
external transform
> to do ML (say, via TFX), and two Java IOs (which happen to have
> mutually exclusive dependencies). The ML transform itself
uses Java to
> invoke some SQL.
>
> The way things work currently is each external transform
will have an
> associated fully specified environment and a runner can use
docker to
> start up the required workers at the expected time.
>
> Now, suppose one doesn't have docker on the workers. One
wants to run this with
>
> ./my_pipeline --someFlag=someValue
--someOtherFlag=someOtherValue ...
>
> such that docker is no longer needed. What someFlags would
we need,
> and what would their values be? (And how to make this
feasible to
> implement.)
>
> Are there meaningful intermediate points that extend to a
general
> solution (or at least aren't hostile to it)?
I believe that in the option 2) the best way would to use
each SDK's URN
Then the arguments could be something like
"--expansionEnvironments={"apache:beam:go:2.33.0:latest"={"env"="DOCKER",
config="<image>"},
"apache:beam:python:2.33.0:latest"={env="PROCESS",
config={...}}". Yes, it would require a lot of "syntactic
sugar" to
configure that. :) (sorry if I don't have URNs for SDKs 100%
correct)
>
>
> I still think in the long run having runners understand
environments,
> and saying "oh, whenever I see
'apache:beam:java:2.33.0:latest' I'll
> swap that out for 'path/to/my/java -cp ...' is the right
way to go
> long-term. (I would put this in runners, not SDKs, though a
common
> runners library could be used.)
Yes, I also agree, that expansion service should be
runner-dependent (or
at least runner-aware), as that brings optimizations. Runner
could
ignore settings from previous point when it can be *sure* it
can do so.
>
>
> On Tue, Jun 29, 2021 at 1:29 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>> Thanks for pointing to that thread.
>>
>> 1) I'm - as well as Kyle - fine with the approach that we
use a
>> "preferred environment" for the expansion service. We only
need to pass
>> it via command line. Yes, the command line might be generally
>> SDK-dependent, and that makes it expansion dependent,
because whether or
>> not particular transform is "external" or not is
implementation detail.
>> That is the nasty part. The rest of my original question
is about, how
>> exactly to do that, because it seems to be tricky, due to
the fact, that
>> it is not possible to include runtime dependency on
DirectRunner (fails
>> many, many tests) and it is not possible to extract
PipelineOptions as a
>> Map either.
>>
>> 2) Regarding SDK injecting environment, I still think that
is the
>> correct way. The SDK (the driver code) own the execution
environment. It
>> should be able to define (or at least prioritize) runtime
environments
>> of all transforms. If we cannot know in advance, which
transform is
>> going to expand to how many nested (and possibly external)
transforms, I
>> think that the SDK could be fine with providing a Map(SDK ->
>> environment). That is: "Run Java using PROCESS", "Run
Python using
>> DOCKER", and so on. A default mapping might exist on the
expansion
>> service as well (which might be passed through command
line and that is
>> the point 1)). Yes, the Map approach is definitely not
universal,
>> because one can imagine that the SDK itself is not enough
for specifying
>> the environment, but seems that vast majority of cases
would fit into that.
>>
>> 3) The best might be for the SDK to provide a list of
supported
>> environments with additional metrics which the expansion
service might
>> choose from.
>>
>> These three approaches are all extensions to the current
state. Current
>> state has predefined environment without possibility to
change it.
>> Option 1) changes it to single configurable environment,
option 2) to N
>> environments based on SDK and option 3) to M environments
based on
>> SDK-dependent metrics (and/or capabilitites of particular
environment).
>> Seems like gradual extensions of the current state, so
maybe we can
>> focus on the first one, and maybe add other, when there is
a need?
>>
>> If this could be the first conclusion, then the next one
would be, what
>> should be the preferred way to implement it.
>>
>> WDYT?
>>
>> On 6/29/21 9:15 PM, Robert Bradshaw wrote:
>>> +1, thanks for digging up that thread.
>>>
>>> I am still of the same opinion that I wrote there. To
touch on some
>>> things brought up here, copying something like
>>> defaultEnvironmentConfig doesn't make sense from language
to language
>>> (e.g. the docker image name or CLI arguments for
subprocess mode just
>>> isn't going to work for all of Python, Java, and Go, and
clearly
>>> embedded type is only going to work for one.)
>>>
>>> In the short term, to change environment (or anything
else) about the
>>> "default" expansions service, the thing to do is build
and start your
>>> own expansion service that sets up the environment for
its transforms
>>> in a custom way.
>>>
>>> FYI, in Python, one can use --beam_services to use a
custom expansion
>>> service. E.g.
>>>
>>>
--beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
>>> "localhost:port"}'
>>>
>>> would override the default one when using SqlTransform.
>>>
>>> On Tue, Jun 29, 2021 at 11:47 AM Kyle Weaver
<kcwea...@google.com <mailto:kcwea...@google.com>> wrote:
>>>> For context, there was a previous thread which touched
on many of the same points:
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E>
>>>>
>>>> On Tue, Jun 29, 2021 at 11:16 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>>>>> I would slightly disagree that this breaks the black
box nature of the expansion, the "how the transform expands"
remains unknown to the SDK requesting the expansion, the "how
the transform executes" - on the other hand - is something
that the SDK must cooperate on - it knows (or could or should
know) what is the environment that the pipeline is going to
be executed on looks like. That is why expansion service on
its own cannot correctly define the execution environment. It
could, if it would be bound to runner (and its environemnt) -
for instance FlinkRunnerExpansionService could probably
expand KafkaIO to something more 'native'. But that requires
knowledge of the target runner. If the expansion service is
not dedicated to a runner, the only place where it can be
defined, is the SDK - and therefore the expansion request.
>>>>>
>>>>>> Power users can always modify the output produced by
the expansion service as well.
>>>>> I'm not sure if I follow this, do you mean that power
users, who run the expansion service can modify the output?
Or is the output (protobuf) of the expansion service easily
transferable between different execution environments?- I had
the impression, that execution environments do not
necessarily have to have the same payloads associated with
them, and therefore it is impossible to 'postprocess' the
output of the expansion. Is that wrong assumption?
>>>>>
>>>>> On 6/29/21 7:55 PM, Luke Cwik wrote:
>>>>>
>>>>> This would "break" the black box where the expansion
service is supposed to hide the implementation internals from
the caller and pushes compatibility of these kinds of
environment overrides on to the expansion service and its
implementer.
>>>>>
>>>>> Power users can always modify the output produced by
the expansion service as well.
>>>>>
>>>>> On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>>>>>> The argument for being able to accept (possibly
ordered list of) execution environments is in that this could
make a single instance of execution service reusable by
various clients with different requirements. Moreover, the
two approaches are probably orthogonal - users could specify
'defaultExecutionEnvironment' for the service which could be
used in case when there is no preference given by the client.
>>>>>>
>>>>>> On 6/29/21 7:03 PM, Luke Cwik wrote:
>>>>>>
>>>>>> I would be much more inclined for the user being able
to configure the expansion service for their needs instead of
changing the expansion service API.
>>>>>>
>>>>>> On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>>>>>>> If I understand it correctly, there is currently no
place to set the
>>>>>>> defaultEnvironmentType - python's KafkaIO uses either
>>>>>>> 'expansion_service' given by the user (which might be
a host:port, or an
>>>>>>> object that has appropriate method), or calls
>>>>>>> 'default_io_expansion_service' - which in turn runs
ExpansionService
>>>>>>> using gradle. Either way, it ends up in
ExpansionService#main [1]. It
>>>>>>> could be possible to adapt ExpansionService and call
it locally -
>>>>>>> provided ExpansionService would provide a way to
extend it (using
>>>>>>> protected method createPipeline()) seems to be enough
- but that is not
>>>>>>> too much user-friendly. If we could specify the
defaultEnvironmentConfig
>>>>>>> when starting the ExpansionService, it would be
possible to add these
>>>>>>> parameters in the python SDK's KafkaIO, which would
mean users do not
>>>>>>> have to worry about the expansion service at all
(leaving aside that
>>>>>>> using too many ReafFromKafka or WriteToKafka
transforms would somewhat
>>>>>>> hurt performance during pipeline build, but that
applies to the pipeline
>>>>>>> build time only). I have created [2] to track that.
>>>>>>>
>>>>>>> Does that make sense, or is my analysis incorrect?
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> [1]
>>>>>>>
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
>>>>>>>
>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-12539
<https://issues.apache.org/jira/browse/BEAM-12539>
>>>>>>>
>>>>>>>
>>>>>>> On 6/29/21 6:24 PM, Alexey Romanenko wrote:
>>>>>>>> I’m sorry if I missed something but do you mean that
PortablePipelineOptions.setDefaultEnvironmentType(String)
doesn’t work for you? Or it’s only a specific case while
using portable KafkaIO?
>>>>>>>>
>>>>>>>>> On 29 Jun 2021, at 09:51, Jan Lukavský
<x666je...@gmail.com <mailto:x666je...@gmail.com>> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I have come across an issue with cross-language
transforms. My setup is I have working environment type
PROCESS and I cannot use DOCKER. When I use Python's KafkaIO,
it unfortunately - by default - expands to docker
environment, which then fails due to missing 'docker'
command. I didn't find a solution without tackling the
expansion service, yet.
>>>>>>>>>
>>>>>>>>> I see several possible solutions to that:
>>>>>>>>>
>>>>>>>>> a) I would say, that the cleanest solution
would be to add preferred environment type to the expansion
request to the expansion service (probably along with
additional flags, probably --experiments?). This requires
deeper changes to the expansion RPC defintion, probably
serializing the PipelineOptions from the client environment
into the ExpansionRequest.
>>>>>>>>>
>>>>>>>>> b) Another option would be to allow specifying
some of the command-line arguments when starting the
expansion service, which currently accepts only port on
command line, see [1]. The straightforward 'fix' (see [2])
unfortunately does not work, because it requires DirectRunner
to be on the classpath, which then breaks other runners (see
[3]). It seems possible to copy hand selected options from
command line to the Pipeline, but that feels hackish. It
would require to either be able to construct the Pipeline
without a runner specified (which seems possible when calling
Pipeline.create(), but not when using PipelineOptions create
by parsing command-line arguments) or to be able to create a
Map<String, String> from PIpelineOptions and then the ability
to copy all options into the Pipeline's options.
>>>>>>>>>
>>>>>>>>> My proposal would be to create a hackish shortcut
and just copy the --defaultEnvironmentType,
--defaultEnvironmentConfig and --experiments into Pipeline's
options for now, and create an issue for a proper solution
(possible a)?).
>>>>>>>>>
>>>>>>>>> WDYT? Or did I miss a way to override the default
expansion?
>>>>>>>>>
>>>>>>>>> Thanks for comments,
>>>>>>>>>
>>>>>>>>> Jan
>>>>>>>>>
>>>>>>>>> [1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
>>>>>>>>>
>>>>>>>>> [2] https://github.com/apache/beam/pull/15082
<https://github.com/apache/beam/pull/15082>
>>>>>>>>>
>>>>>>>>> [3]
https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
<https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
>>>>>>>>>