Can you please add this to the design documents webpage.
https://beam.apache.org/contribute/design-documents/

On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath <chamik...@google.com> wrote:
>
>
>
> On Tue, May 7, 2019 at 10:21 AM Maximilian Michels <m...@apache.org> wrote:
>>
>> Here's the first draft:
>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>>
>> It's rather high-level. We may want to add more details once we have
>> finalized the design. Feel free to make comments and edits.
>
>
> Thanks Max. Added some comments.
>
>>
>>
>> > All of this goes back to the idea that I think the listing of
>> > artifacts (or more general dependencies) should be a property of the
>> > environment themselves.
>>
>> +1 I came to the same conclusion while thinking about how to store
>> artifact information for deferred execution of the pipeline.
>>
>> -Max
>>
>> On 07.05.19 18:10, Robert Bradshaw wrote:
>> > Looking forward to your writeup, Max. In the meantime, some comments below.
>> >
>> >
>> > From: Lukasz Cwik <lc...@google.com>
>> > Date: Thu, May 2, 2019 at 6:45 PM
>> > To: dev
>> >
>> >>
>> >>
>> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <rober...@google.com> 
>> >> wrote:
>> >>>
>> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com> wrote:
>> >>>>
>> >>>> We should stick with URN + payload + artifact metadata[1] where the 
>> >>>> only mandatory one that all SDKs and expansion services understand is 
>> >>>> the "bytes" artifact type. This allows us to add optional URNs for 
>> >>>> file://, http://, Maven, PyPi, ... in the future. I would make the 
>> >>>> artifact staging service use the same URN + payload mechanism to get 
>> >>>> compatibility of artifacts across the different services and also have 
>> >>>> the artifact staging service be able to be queried for the list of 
>> >>>> artifact types it supports.
>> >>>
>> >>> +1
>> >>>
>> >>>> Finally, we would need to have environments enumerate the artifact 
>> >>>> types that they support.
>> >>>
>> >>> Meaning at runtime, or as another field statically set in the proto?
>> >>
>> >>
>> >> I don't believe runners/SDKs should have to know what artifacts each 
>> >> environment supports at runtime and instead have environments enumerate 
>> >> them explicitly in the proto. I have been thinking about a more general 
>> >> "capabilities" block on environments which allow them to enumerate URNs 
>> >> that the environment understands. This would include artifact type URNs, 
>> >> PTransform URNs, coder URNs, ... I haven't proposed anything specific 
>> >> down this line yet because I was wondering how environment resources 
>> >> (CPU, min memory, hardware like GPU, AWS/GCP/Azure/... machine types) 
>> >> should/could tie into this.
>> >>
>> >>>
>> >>>> Having everyone have the same "artifact" representation would be 
>> >>>> beneficial since:
>> >>>> a) Python environments could install dependencies from a 
>> >>>> requirements.txt file (something that the Google Cloud Dataflow Python 
>> >>>> docker container allows for today)
>> >>>> b) It provides an extensible and versioned mechanism for SDKs, 
>> >>>> environments, and artifact staging/retrieval services to support 
>> >>>> additional artifact types
>> >>>> c) Allow for expressing a canonical representation of an artifact like 
>> >>>> a Maven package so a runner could merge environments that the runner 
>> >>>> deems compatible.
>> >>>>
>> >>>> The flow I could see is:
>> >>>> 1) (optional) query artifact staging service for supported artifact 
>> >>>> types
>> >>>> 2) SDK request expansion service to expand transform passing in a list 
>> >>>> of artifact types the SDK and artifact staging service support, the 
>> >>>> expansion service returns a list of artifact types limited to those 
>> >>>> supported types + any supported by the environment
>> >>>
>> >>> The crux of the issue seems to be how the expansion service returns
>> >>> the artifacts themselves. Is this going with the approach that the
>> >>> caller of the expansion service must host an artifact staging service?
>> >>
>> >>
>> >> The caller would not need to host an artifact staging service (but would 
>> >> become effectively a proxy service, see my comment below for more 
>> >> details) as I would have expected this to be part of the expansion 
>> >> service response.
>> >>
>> >>>
>> >>> There is also the question here is how the returned artifacts get
>> >>> attached to the various environments, or whether they get implicitly
>> >>> applied to all returned stages (which need not have a consistent
>> >>> environment)?
>> >>
>> >>
>> >> I would suggest returning additional information that says what artifact 
>> >> is for which environment. Applying all artifacts to all environments is 
>> >> likely to cause issues since some environments may not understand certain 
>> >> artifact types or may get conflicting versions of artifacts. I would see 
>> >> this happening since an expansion service that aggregates other expansion 
>> >> services seems likely, for example:
>> >>                               /-> ExpansionSerivce(Python)
>> >> ExpansionService(Aggregator) --> ExpansionService(Java)
>> >>                               \-> ExpansionSerivce(Go)
>> >
>> > All of this goes back to the idea that I think the listing of
>> > artifacts (or more general dependencies) should be a property of the
>> > environment themselves.
>> >
>> >>>> 3) SDK converts any artifact types that the artifact staging service or 
>> >>>> environment doesn't understand, e.g. pulls down Maven dependencies and 
>> >>>> converts them to "bytes" artifacts
>> >>>
>> >>> Here I think we're conflating two things. The "type" of an artifact is
>> >>> both (1) how to fetch the bytes and (2) how to interpret them (e.g. is
>> >>> this a jar file, or a pip tarball, or just some data needed by a DoFn,
>> >>> or ...) Only (1) can be freely transmuted.
>> >>
>> >>
>> >> Your right. Thinking about this some more, general artifact conversion is 
>> >> unlikely to be practical because how to interpret an artifact is 
>> >> environment dependent. For example, a requirements.txt used to install 
>> >> pip packages for a Python docker container depends on the filesystem 
>> >> layout of that specific docker container. One could simulate doing a pip 
>> >> install on the same filesystem, see the diff and then of all the packages 
>> >> in requirements.txt but this quickly becomes impractical.
>> >>
>> >>>
>> >>>> 4) SDK sends artifacts to artifact staging service
>> >>>> 5) Artifact staging service converts any artifacts to types that the 
>> >>>> environment understands
>> >>>> 6) Environment is started and gets artifacts from the artifact 
>> >>>> retrieval service.
>> >>>>
>> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw <rober...@google.com> 
>> >>>> wrote:
>> >>>>>
>> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels <m...@apache.org> 
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Good idea to let the client expose an artifact staging service that 
>> >>>>>> the
>> >>>>>> ExpansionService could use to stage artifacts. This solves two 
>> >>>>>> problems:
>> >>>>>>
>> >>>>>> (1) The Expansion Service not being able to access the Job Server
>> >>>>>> artifact staging service
>> >>>>>> (2) The client not having access to the dependencies returned by the
>> >>>>>> Expansion Server
>> >>>>>>
>> >>>>>> The downside is that it adds an additional indirection. The 
>> >>>>>> alternative
>> >>>>>> to let the client handle staging the artifacts returned by the 
>> >>>>>> Expansion
>> >>>>>> Server is more transparent and easier to implement.
>> >>>>>
>> >>>>> The other downside is that it may not always be possible for the
>> >>>>> expansion service to connect to the artifact staging service (e.g.
>> >>>>> when constructing a pipeline locally against a remote expansion
>> >>>>> service).
>> >>>>
>> >>>> Just to make sure, your saying the expansion service would return all 
>> >>>> the artifacts (bytes, urls, ...) as part of the response since the 
>> >>>> expansion service wouldn't be able to connect to the SDK that is 
>> >>>> running locally either.
>> >>>
>> >>> Yes. Well, more I'm asking how the expansion service would return any
>> >>> artifacts.
>> >>>
>> >>> What we have is
>> >>>
>> >>> Runner <--- SDK ---> Expansion service.
>> >>>
>> >>> Where the unidirectional arrow means "instantiates a connection with"
>> >>> and the other direction (and missing arrows) may not be possible.
>> >>
>> >>
>> >> I believe the ExpansionService Expand request should become a 
>> >> unidirectional stream back to the caller so that artifacts could be sent 
>> >> back to the SDK (effectively mirroring the artifact staging service API). 
>> >> So the expansion response would stream back a bunch artifact data 
>> >> messages and also the expansion response containing PTransform 
>> >> information.
>> >
>> > +1.
>> >
>> >>>>>> Ideally, the Expansion Service won't return any dependencies because 
>> >>>>>> the
>> >>>>>> environment already contains the required dependencies. We could make 
>> >>>>>> it
>> >>>>>> a requirement for the expansion to be performed inside an environment.
>> >>>>>> Then we would already ensure during expansion time that the runtime
>> >>>>>> dependencies are available.
>> >>>>>
>> >>>>> Yes, it's cleanest if the expansion service provides an environment
>> >>>>> without all the dependencies provided. Interesting idea to make this a
>> >>>>> property of the expansion service itself.
>> >>>>
>> >>>> I had thought this too but an opaque docker container that was built on 
>> >>>> top of a base Beam docker container would be very difficult for a 
>> >>>> runner to introspect and check to see if its compatible to allow for 
>> >>>> fusion across PTransforms. I think artifacts need to be communicated in 
>> >>>> their canonical representation.
>> >>>
>> >>> It's clean (from the specification point of view), but doesn't allow
>> >>> for good introspection/fusion (aside from one being a base of another,
>> >>> perhaps).
>> >>>
>> >>>>>>> In this case, the runner would (as
>> >>>>>>> requested by its configuration) be free to merge environments it
>> >>>>>>> deemed compatible, including swapping out beam-java-X for
>> >>>>>>> beam-java-embedded if it considers itself compatible with the
>> >>>>>>> dependency list.
>> >>>>>>
>> >>>>>> Could you explain how that would work in practice?
>> >>>>>
>> >>>>> Say one has a pipeline with environments
>> >>>>>
>> >>>>> A: beam-java-sdk-2.12-docker
>> >>>>> B: beam-java-sdk-2.12-docker + dep1
>> >>>>> C: beam-java-sdk-2.12-docker + dep2
>> >>>>> D: beam-java-sdk-2.12-docker + dep3
>> >>>>>
>> >>>>> A runner could (conceivably) be intelligent enough to know that dep1
>> >>>>> and dep2 are indeed compatible, and run A, B, and C in a single
>> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
>> >>>>> corresponding fusion and lower overhead benefits). If a certain
>> >>>>> pipeline option is set, it might further note that dep1 and dep2 are
>> >>>>> compatible with its own workers, which are build against sdk-2.12, and
>> >>>>> choose to run these in embedded + dep1 + dep2 environment.
>> >>>>
>> >>>> We have been talking about the expansion service and cross language 
>> >>>> transforms a lot lately but I believe it will initially come at the 
>> >>>> cost of poor fusion of transforms since "merging" environments that are 
>> >>>> compatible is a difficult problem since it brings up many of the 
>> >>>> dependency management issues (e.g. diamond dependency issues).
>> >>>
>> >>> I agree. I think expansion services offering "kitchen-sink"
>> >>> containers, when possible, can go far here. If we could at least
>> >>> recognize when one environment/set of deps is a superset of another,
>> >>> that could be an easy case that would yield a lot of benefit as well.
>> >>
>> >>
>> >> +1

Reply via email to