> I'm totally fine with changes to ExpansionService (java) to support additional features.

Looks like this is consensus, I'm with it as well, for the first round. The problem is how exactly to modify it. I think it should accept complete list of PipelineOptions (or at least some defined subset - PortabilityPipelineOptions, ExperimentalOptions, ...?), the problem is that in order to use Pipeline.create(options), the options *must* include runner. Adding :runners:direct-java to dependency of :sdks:java:expansion_service does not work, because it conflicts with other runners.

I just modified the PR [1] to include a NoOpRunner, which seems to resolve the issues and the tests pass. Feels a little hackish, but I might be OK with it.

[1] https://github.com/apache/beam/pull/15082/files#diff-fc9fd0c0d7d5f3ad6d5db7ec63ca1d75080a6527fd053a2ad36333d760da5b70R552


On 6/30/21 4:46 PM, Chamikara Jayalath wrote:


On Wed, Jun 30, 2021 at 7:41 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    > java -jar beam-sdks-java-io-expansion-service-2.30.0.jar <port>

    This does not accept any other parameters than the port. That is
    the first part of this thread - the intent was to enable this to
    accept additional arguments, but there are (still waiting to be
    addressed unresolved) issues. There currently even seems to be no
    other way to adapt ExpansionService than to copy&paste the code
    and modify it, because it simply is not extensible. What would be
    enough is wrapping Pipeline.create() [1] call to a protected
    method, or add (protected) constructor that would accept
    PipelineOptions (probably better in this regard). That would make
    it more easy for users to create customized ExpansionService and
    it would (sort of) help solving described issues.


I'm totally fine with changes to ExpansionService (java) to support additional features.

    But even if we do that, we still need to deal with the expansion
    service on two places:

     a) run it (and stop it)

     b) specify it in the

    Using the default expansion service is much, much easier, it is
    started and stopped automatically for the user. Morever, the
    JavaJarExpansionService actually even presumes that there can be
    additional arguments passed to the service ([2]), the
    ExpansionService only does not accept them (and kafka IO does not
    expose that - that could be worked-around by users by manually
    creating the JavaJarExpansionService from own jar, yes). I would
    find it natural to add the command-line parsing (somehow!) to the
    ExpansionService itself, so that it doesn't need end-user
    modifications and then to figure out how to most easily expose
    there command-line arguments to end-users.

    > Or PROCESS mode.

    Yes, I verified that Flink can use Python Kafka IO over PROCESS
    environment with some hacking of the ExpansionService as shown in
    one of the linked PRs (though there is probably still some bugs
    regarding SDF - [3]). Adding --experiments seems have the same
    issues, need expose that to the CLI of ExpansionService. And I'm
    not sure if this [4] is not in conflict with
    --experiments=use_deprecated_read. That is something I still need
    to investigate.


This is very good to know. Thanks.


    LOOPBACK is currently not supported by Flink. That is nice-to-have
    feature.

     Jan

    [1]
    
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394
    
<https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394>

    [2]
    
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481
    
<https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481>

    [3] https://issues.apache.org/jira/browse/BEAM-11998
    <https://issues.apache.org/jira/browse/BEAM-11998>

    [4]
    
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398
    
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398>

    On 6/30/21 3:57 PM, Chamikara Jayalath wrote:


    On Wed, Jun 30, 2021 at 6:54 AM Chamikara Jayalath
    <chamik...@google.com <mailto:chamik...@google.com>> wrote:



        On Wed, Jun 30, 2021 at 1:20 AM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            On 6/30/21 1:16 AM, Robert Bradshaw wrote:
            > <rant>Why doesn't docker in docker just work, rather
            than having to do
            > ugly hacks when composing two technologies that both
            rely on
            > docker...</rant>
            >
            > Presumably you're setting up a node for Kafka and
            Flink; why not set
            > one up for the expansion service as well? The UX of
            >
            >
            
ReadFromKafka(default_expansion_service_args={"defaultEnvironmentType":
            > "PROCESS", "defaultEnvironmentConfig": ""{\"os\":
            \"linux\", \"arch\":
            > \"amd64\", \"command\": \"/path/to/launcher/boot
            > cp=/some/other/long/path\" ...}")"})
            >
            > isn't that great either. Rather than pass arbitrary
            arguments to a
            > default expansion service, I still think once you get
            to this level
            > it's better to just start your own expansion service.

            Sure, that is possible (seems to me, that it would still
            require some
            changes to ExpansionService to be extendable, but yes,
            kind of tiny
            changes). The problem is not with Flink or Kafka - those are
            technologies you are actually expecting to set up,
            because you want to
            use them. The problem is what everything else you must
            set up for making
            something that seems as easy as "read a few messages from
            kafka in beam
            python" to work. You must have:

              a) Python SDK harness (OK, that is something that
            should be probably
            expected) - there are few problems with it, namely it is
            somewhat
            hardcoded that it must run in the same pod as Flink's
            taskmanager to be
            able to use EXTERNAL environment, but ok, let's go on

              b) Java SDK harness, at least installed in docker image
            of taskmanager
            (to be usable via PROCESS environment) - OK, that starts
            to be weird,
            taskmanager is java, right? Something like LOOPBACK would
            be cool there,
            but never mind. You create custom docker image for your
            Flink JM and TM
            and continue.

              c) Implement (extend) and deploy own expansion service
            - ouch, that
            starts to hurt, that is even going to be a pod that is
            running even
            though there is nothing using it (yes, can be scaled down).

            The complexity of a simple task starts to be somewhat
            extraordinary. And
            most of the users will not be willing to follow this
            path, I'm afraid.
            People generally don't like to set up complex environment
            for something
            that looks it should "just work".  There is non-trivial
            work necessary
            to make all of this working, mostly when you are starting
            to evaluate
            Beam and don't have much experience with it.


        I don't think we should expect end-users to implement or
        extend the expansion service. Everything should be already
        implemented and maybe we can even provide a script to easily
        startup a local Java expansion service with additional
        parameters.

        Today, to start a Java expansion service for Kafka users have
        to do the following.

        * Download expansion service jar released with Beam for
        Kafka. For example [1]

        * Run following command:
        java -jar beam-sdks-java-io-expansion-service-2.30.0.jar <port>

        * To use this they just have to provide "localhost:<port>" to
        [2].

        This is a few extra steps but mostly a one time setup for the
        user and nothing to do with portability or other complexities
        of Beam.

        I'm all for simplifying the user-experience, but adding
        changes to the transform API that might have to be deprecated
        later sounds like a bad idea. I'd much rather provide
        additional scripts/documentation/examples to simplify such
        use-cases. I think that will be adequate for most users.

        BTW, slightly orthogonal, I don't think multi-language would
        work in LOOPBACK mode today without additional changes to
        portable runners (at least I've never tested this). Did you
        confirm that this works ?


    Or PROCESS mode.


        Thanks,
        Cham

        [1]
        
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar
        
<https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar>
        [2]
        
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
        
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>


            We can get rid of b) (implement LOOPBACK in Flink) and c)
            (enable Python
            SDK Kafka IO to spawn expansion service with the LOOPBACK
            environment
            when submitting to Flink). That is why I still think that
            this
            simplification matters a lot.

            >
            > On Tue, Jun 29, 2021 at 3:33 PM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >> I believe we could change that more or less the same
            as we can deprecate / stop supporting any other parameter
            of any method. If python starts to support natively Kafka
            IO, then we can simply log warning / raise exception (one
            after the other). That seems like natural development.
            >>
            >> Maybe I should have described the case - I'm trying to
            setup a "simple" use-case for users that want to try
            Python SDK to read using Flink from Kafka using Minikube
            (both Kafka and Flink are running inside Minikube). There
            are tons of problems to use docker from within Minkube
            and I would not say that is the "simple" way we would
            like to present to users. Setting up own expansion
            service is possibility - but that also lacks the UX
            approach. I pretty much think that understanding
            portability on it's own is already a burden we put on
            users (yes, we do that for a reason, but everything else
            should be as simple as possible).
            >>
            >> On 6/30/21 12:16 AM, Chamikara Jayalath wrote:
            >>
            >> So I think one downside to this PR is that we assume
            that the default expansion service used by the transform
            (Kafka in this case) will not change. Currently it's
            fully opaque. In the default case we just promise that
            the transform will work (if conditions I mentioned above
            are met). Nothing else.
            >> If we add a "param default_expansion_service_args", we
            leak the nature of the default expansion service to the
            API and it will be hard to change it in the future.
            >>
            >> Thanks,
            >> Cham
            >>
            >> On Tue, Jun 29, 2021 at 3:07 PM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >>> I would absolutely understand this, if it would be
            mostly impossible or at least really hard to get the user
            friendly behavior. But we are mostly there in this case.
            When we can actually quite simply pass the supported
            environment via parameter, I think we should go for it.
            >>>
            >>> I have created a sketch (I verified that when the
            ExpansionService is patched 'enough' this works) in [1].
            This is only a sketch, because we first must know how to
            support the default execution environment in
            ExpansionService.
            >>>
            >>> [1] https://github.com/apache/beam/pull/15099/files
            <https://github.com/apache/beam/pull/15099/files>
            >>>
            >>> On 6/29/21 11:51 PM, Chamikara Jayalath wrote:
            >>>
            >>>
            >>>
            >>> On Tue, Jun 29, 2021 at 2:39 PM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >>>> On 6/29/21 11:04 PM, Robert Bradshaw wrote:
            >>>>> You can configure the environment in the current
            state, you just have
            >>>>> to run your own expansion service that has a
            different environment
            >>>>> backed into it (or, makes this configurable).
            >>>> Yes, that is true. On the other hand that lacks some
            user-friendliness,
            >>>> because ideally, you don't want to worry about
            expansion services,
            >>>> mostly when it comes to some mostly standard IO. The
            ideal case is that
            >>>> you either do not basically know that you use
            external transform (which
            >>>> is probably the case when you can use docker), or
            you are able to
            >>>> overcome the problem within the SDK (Python) by
            passing some argument to
            >>>> the input transform.
            >>>
            >>> Arguments passed to the pipeline level apply to the
            whole pipeline (not just one transform). So if you pass
            in a default environment (and configs) at pipeline level,
            that would mean the default environment and configs used
            by the pipeline (so Python SDK in this case) not a
            specific transform.
            >>> I believe we have made usage of external transforms
            used-friendly for the general case. But we had to make
            some assumptions. For example we assumed,
            >>> * user will be using the default environment of the
            expansion service (Docker in this case)
            >>> * User will be using the pre-specified dependency
            only (sdks:java:io:expansion-service:shadowJar for Kafka)
            >>> * User will be in an environment where the jar can be
            downloaded.
            >>>
            >>> I would consider any use-case where these basic
            assumptions cannot be met as an advanced use-case. The
            solution in such a case would be to start a custom
            expansion service and pass the address of it as a
            parameter to the transform [1]. I'm fine with extending
            the capabilities of Java expansion service by adding more
            parameters (for example, for overriding the environment,
            for specifying dependencies, for providing pipeline options).
            >>>
            >>> Thanks,
            >>> Cham
            >>>
            >>> [1]
            
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133
            
<https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133>
            >>>
            >>>
            >>>>> Is option (1) updating the default expansion
            service such that one can
            >>>>> override default environment properties on the
            command line? (You
            >>>>> would still have to start it up manually to use it.)
            >>>> Yes and no. :) Updating ExpansionService so that you
            can specify default
            >>>> environment on command like makes this accessible to
            >>>> JavaJarExpansionService, and that makes it possible
            to add (optional)
            >>>> argument to Python Kafka IO, that would delegate
            this to the
            >>>> (automatically) started expansion service. It is
            important to note that
            >>>> both ReadFromKafka and WriteToKafka have expansion
            that involves only
            >>>> single external (Java) SDK. That simplifies things.
            >>>>> Maybe it would help to make things more concrete.
            Suppose I have a Go
            >>>>> pipeline that uses a library which invokes a Python
            external transform
            >>>>> to do ML (say, via TFX), and two Java IOs (which
            happen to have
            >>>>> mutually exclusive dependencies). The ML transform
            itself uses Java to
            >>>>> invoke some SQL.
            >>>>>
            >>>>> The way things work currently is each external
            transform will have an
            >>>>> associated fully specified environment and a runner
            can use docker to
            >>>>> start up the required workers at the expected time.
            >>>>>
            >>>>> Now, suppose one doesn't have docker on the
            workers. One wants to run this with
            >>>>>
            >>>>>       ./my_pipeline --someFlag=someValue
            --someOtherFlag=someOtherValue ...
            >>>>>
            >>>>> such that docker is no longer needed. What
            someFlags would we need,
            >>>>> and what would their values be? (And how to make
            this feasible to
            >>>>> implement.)
            >>>>>
            >>>>> Are there meaningful intermediate points that
            extend to a general
            >>>>> solution (or at least aren't hostile to it)?
            >>>> I believe that in the option 2) the best way would
            to use each SDK's URN
            >>>> Then the arguments could be something like
            >>>>
            
"--expansionEnvironments={"apache:beam:go:2.33.0:latest"={"env"="DOCKER",
            >>>> config="<image>"},
            "apache:beam:python:2.33.0:latest"={env="PROCESS",
            >>>> config={...}}". Yes, it would require a lot of
            "syntactic sugar" to
            >>>> configure that. :) (sorry if I don't have URNs for
            SDKs 100% correct)
            >>>>>
            >>>>> I still think in the long run having runners
            understand environments,
            >>>>> and saying "oh, whenever I see
            'apache:beam:java:2.33.0:latest' I'll
            >>>>> swap that out for 'path/to/my/java -cp ...' is the
            right way to go
            >>>>> long-term. (I would put this in runners, not SDKs,
            though a common
            >>>>> runners library could be used.)
            >>>> Yes, I also agree, that expansion service should be
            runner-dependent (or
            >>>> at least runner-aware), as that brings
            optimizations. Runner could
            >>>> ignore settings from previous point when it can be
            *sure* it can do so.
            >>>>>
            >>>>> On Tue, Jun 29, 2021 at 1:29 PM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >>>>>> Thanks for pointing to that thread.
            >>>>>>
            >>>>>> 1) I'm - as well as Kyle - fine with the approach
            that we use a
            >>>>>> "preferred environment" for the expansion service.
            We only need to pass
            >>>>>> it via command line. Yes, the command line might
            be generally
            >>>>>> SDK-dependent, and that makes it expansion
            dependent, because whether or
            >>>>>> not particular transform is "external" or not is
            implementation detail.
            >>>>>> That is the nasty part. The rest of my original
            question is about, how
            >>>>>> exactly to do that, because it seems to be tricky,
            due to the fact, that
            >>>>>> it is not possible to include runtime dependency
            on DirectRunner (fails
            >>>>>> many, many tests) and it is not possible to
            extract PipelineOptions as a
            >>>>>> Map either.
            >>>>>>
            >>>>>> 2) Regarding SDK injecting environment, I still
            think that is the
            >>>>>> correct way. The SDK (the driver code) own the
            execution environment. It
            >>>>>> should be able to define (or at least prioritize)
            runtime environments
            >>>>>> of all transforms. If we cannot know in advance,
            which transform is
            >>>>>> going to expand to how many nested (and possibly
            external) transforms, I
            >>>>>> think that the SDK could be fine with providing a
            Map(SDK ->
            >>>>>> environment). That is: "Run Java using PROCESS",
            "Run Python using
            >>>>>> DOCKER", and so on. A default mapping might exist
            on the expansion
            >>>>>> service as well (which might be passed through
            command line and that is
            >>>>>> the point 1)). Yes, the Map approach is definitely
            not universal,
            >>>>>> because one can imagine that the SDK itself is not
            enough for specifying
            >>>>>> the environment, but seems that vast majority of
            cases would fit into that.
            >>>>>>
            >>>>>> 3) The best might be for the SDK to provide a list
            of supported
            >>>>>> environments with additional metrics which the
            expansion service might
            >>>>>> choose from.
            >>>>>>
            >>>>>> These three approaches are all extensions to the
            current state. Current
            >>>>>> state has predefined environment without
            possibility to change it.
            >>>>>> Option 1) changes it to single configurable
            environment, option 2) to N
            >>>>>> environments based on SDK and option 3) to M
            environments based on
            >>>>>> SDK-dependent metrics (and/or capabilitites of
            particular environment).
            >>>>>> Seems like gradual extensions of the current
            state, so maybe we can
            >>>>>> focus on the first one, and maybe add other, when
            there is a need?
            >>>>>>
            >>>>>> If this could be the first conclusion, then the
            next one would be, what
            >>>>>> should be the preferred way to implement it.
            >>>>>>
            >>>>>> WDYT?
            >>>>>>
            >>>>>> On 6/29/21 9:15 PM, Robert Bradshaw wrote:
            >>>>>>> +1, thanks for digging up that thread.
            >>>>>>>
            >>>>>>> I am still of the same opinion that I wrote
            there. To touch on some
            >>>>>>> things brought up here, copying something like
            >>>>>>> defaultEnvironmentConfig doesn't make sense from
            language to language
            >>>>>>> (e.g. the docker image name or CLI arguments for
            subprocess mode just
            >>>>>>> isn't going to work for all of Python, Java, and
            Go, and clearly
            >>>>>>> embedded type is only going to work for one.)
            >>>>>>>
            >>>>>>> In the short term, to change environment (or
            anything else) about the
            >>>>>>> "default" expansions service, the thing to do is
            build and start your
            >>>>>>> own expansion service that sets up the
            environment for its transforms
            >>>>>>> in a custom way.
            >>>>>>>
            >>>>>>> FYI, in Python, one can use --beam_services to
            use a custom expansion
            >>>>>>> service. E.g.
            >>>>>>>
            >>>>>>>
            
--beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
            >>>>>>> "localhost:port"}'
            >>>>>>>
            >>>>>>> would override the default one when using
            SqlTransform.
            >>>>>>>
            >>>>>>> On Tue, Jun 29, 2021 at 11:47 AM Kyle Weaver
            <kcwea...@google.com <mailto:kcwea...@google.com>> wrote:
            >>>>>>>> For context, there was a previous thread which
            touched on many of the same points:
            
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E
            
<https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E>
            >>>>>>>>
            >>>>>>>> On Tue, Jun 29, 2021 at 11:16 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >>>>>>>>> I would slightly disagree that this breaks the
            black box nature of the expansion, the "how the transform
            expands" remains unknown to the SDK requesting the
            expansion, the "how the transform executes" - on the
            other hand - is something that the SDK must cooperate on
            - it knows (or could or should know) what is the
            environment that the pipeline is going to be executed on
            looks like. That is why expansion service on its own
            cannot correctly define the execution environment. It
            could, if it would be bound to runner (and its
            environemnt) - for instance FlinkRunnerExpansionService
            could probably expand KafkaIO to something more 'native'.
            But that requires knowledge of the target runner. If the
            expansion service is not dedicated to a runner, the only
            place where it can be defined, is the SDK - and therefore
            the expansion request.
            >>>>>>>>>
            >>>>>>>>>> Power users can always modify the output
            produced by the expansion service as well.
            >>>>>>>>> I'm not sure if I follow this, do you mean that
            power users, who run the expansion service can modify the
            output? Or is the output (protobuf) of the expansion
            service easily transferable between different execution
            environments?- I had the impression, that execution
            environments do not necessarily have to have the same
            payloads associated with them, and therefore it is
            impossible to 'postprocess' the output of the expansion.
            Is that wrong assumption?
            >>>>>>>>>
            >>>>>>>>> On 6/29/21 7:55 PM, Luke Cwik wrote:
            >>>>>>>>>
            >>>>>>>>> This would "break" the black box where the
            expansion service is supposed to hide the implementation
            internals from the caller and pushes compatibility of
            these kinds of environment overrides on to the expansion
            service and its implementer.
            >>>>>>>>>
            >>>>>>>>> Power users can always modify the output
            produced by the expansion service as well.
            >>>>>>>>>
            >>>>>>>>> On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >>>>>>>>>> The argument for being able to accept
            (possibly ordered list of) execution environments is in
            that this could make a single instance of execution
            service reusable by various clients with different
            requirements. Moreover, the two approaches are probably
            orthogonal - users could specify
            'defaultExecutionEnvironment' for the service which could
            be used in case when there is no preference given by the
            client.
            >>>>>>>>>>
            >>>>>>>>>> On 6/29/21 7:03 PM, Luke Cwik wrote:
            >>>>>>>>>>
            >>>>>>>>>> I would be much more inclined for the user
            being able to configure the expansion service for their
            needs instead of changing the expansion service API.
            >>>>>>>>>>
            >>>>>>>>>> On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >>>>>>>>>>> If I understand it correctly, there is
            currently no place to set the
            >>>>>>>>>>> defaultEnvironmentType - python's KafkaIO
            uses either
            >>>>>>>>>>> 'expansion_service' given by the user (which
            might be a host:port, or an
            >>>>>>>>>>> object that has appropriate method), or calls
            >>>>>>>>>>> 'default_io_expansion_service' - which in
            turn runs ExpansionService
            >>>>>>>>>>> using gradle. Either way, it ends up in
            ExpansionService#main [1]. It
            >>>>>>>>>>> could be possible to adapt ExpansionService
            and call it locally -
            >>>>>>>>>>> provided ExpansionService would provide a way
            to extend it (using
            >>>>>>>>>>> protected method createPipeline()) seems to
            be enough - but that is not
            >>>>>>>>>>> too much user-friendly. If we could specify
            the defaultEnvironmentConfig
            >>>>>>>>>>> when starting the ExpansionService, it would
            be possible to add these
            >>>>>>>>>>> parameters in the python SDK's KafkaIO, which
            would mean users do not
            >>>>>>>>>>> have to worry about the expansion service at
            all (leaving aside that
            >>>>>>>>>>> using too many ReafFromKafka or WriteToKafka
            transforms would somewhat
            >>>>>>>>>>> hurt performance during pipeline build, but
            that applies to the pipeline
            >>>>>>>>>>> build time only). I have created [2] to track
            that.
            >>>>>>>>>>>
            >>>>>>>>>>> Does that make sense, or is my analysis
            incorrect?
            >>>>>>>>>>>
            >>>>>>>>>>>      Jan
            >>>>>>>>>>>
            >>>>>>>>>>> [1]
            >>>>>>>>>>>
            
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
            
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
            >>>>>>>>>>>
            >>>>>>>>>>> [2]
            https://issues.apache.org/jira/browse/BEAM-12539
            <https://issues.apache.org/jira/browse/BEAM-12539>
            >>>>>>>>>>>
            >>>>>>>>>>>
            >>>>>>>>>>> On 6/29/21 6:24 PM, Alexey Romanenko wrote:
            >>>>>>>>>>>> I’m sorry if I missed something but do you
            mean that
            PortablePipelineOptions.setDefaultEnvironmentType(String)
            doesn’t work for you? Or it’s only a specific case while
            using portable KafkaIO?
            >>>>>>>>>>>>
            >>>>>>>>>>>>> On 29 Jun 2021, at 09:51, Jan Lukavský
            <x666je...@gmail.com <mailto:x666je...@gmail.com>> wrote:
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> Hi,
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> I have come across an issue with
            cross-language transforms. My setup is I have working
            environment type PROCESS and I cannot use DOCKER. When I
            use Python's KafkaIO, it unfortunately - by default -
            expands to docker environment, which then fails due to
            missing 'docker' command. I didn't find a solution
            without tackling the expansion service, yet.
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> I see several possible solutions to that:
            >>>>>>>>>>>>>
            >>>>>>>>>>>>>      a) I would say, that the cleanest
            solution would be to add preferred environment type to
            the expansion request to the expansion service (probably
            along with additional flags, probably --experiments?).
            This requires deeper changes to the expansion RPC
            defintion, probably serializing the PipelineOptions from
            the client environment into the ExpansionRequest.
            >>>>>>>>>>>>>
            >>>>>>>>>>>>>      b) Another option would be to allow
            specifying some of the command-line arguments when
            starting the expansion service, which currently accepts
            only port on command line, see [1]. The straightforward
            'fix' (see [2]) unfortunately does not work, because it
            requires DirectRunner to be on the classpath, which then
            breaks other runners (see [3]). It seems possible to copy
            hand selected options from command line to the Pipeline,
            but that feels hackish. It would require to either be
            able to construct the Pipeline without a runner specified
            (which seems possible when calling Pipeline.create(), but
            not when using PipelineOptions create by parsing
            command-line arguments) or to be able to create a
            Map<String, String> from PIpelineOptions and then the
            ability to copy all options into the Pipeline's options.
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> My proposal would be to create a hackish
            shortcut and just copy the --defaultEnvironmentType,
            --defaultEnvironmentConfig and --experiments into
            Pipeline's options for now, and create an issue for a
            proper solution (possible a)?).
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> WDYT? Or did I miss a way to override the
            default expansion?
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> Thanks for comments,
            >>>>>>>>>>>>>
            >>>>>>>>>>>>>      Jan
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> [1]
            
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
            
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> [2]
            https://github.com/apache/beam/pull/15082
            <https://github.com/apache/beam/pull/15082>
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> [3]
            https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
            <https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
            >>>>>>>>>>>>>

Reply via email to